Partilhar via


create_auto_cdc_flow

A create_auto_cdc_flow() função cria um fluxo que usa a funcionalidade CDC (captura de dados de mudanças) do Lakeflow Spark Declarative Pipelines para processar dados de origem de um feed de dados de alteração (CDF).

Observação

Esta função substitui a função apply_changes()anterior. As duas funções têm a mesma assinatura. O Databricks recomenda a atualização para usar o novo nome.

Importante

Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o create_auto_cdc_flow() esquema da tabela de destino, você deve incluir as __START_AT colunas e __END_AT com o mesmo tipo de dados que os sequence_by campos.

Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() na interface Python do pipeline.

Sintaxe

from pyspark import pipelines as dp

dp.create_auto_cdc_flow(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = <bool>,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None,
  name = None,
  once = <bool>
)

Para create_auto_cdc_flow processamento, o comportamento padrão para INSERT eventos e UPDATE é atualizar eventos CDC da origem: atualizar quaisquer linhas na tabela de destino que correspondam à(s) chave(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de DELETE eventos pode ser especificada com o apply_as_deletes parâmetro.

Para saber mais sobre o processamento de CDC com um feed de alterações, consulte As APIs AUTO CDC: Simplifique a captura de dados de alterações com pipelines. Para obter um exemplo de uso da create_auto_cdc_flow() função, consulte Exemplo: processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF.

Parâmetros

Parâmetro Tipo Description
target str Required. O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a create_auto_cdc_flow() função.
source str Required. A fonte de dados que contém os registros do CDC.
keys list Required. A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino. Você pode especificar:
  • Uma lista de cadeias de caracteres: ["userId", "orderId"]
  • Uma lista de funções do Spark SQL col() : [col("userId"), col("orderId")]. Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).
sequence_by str, col() ou struct() Required. Os nomes das colunas que especificam a ordem lógica dos eventos CDC nos dados fonte. O Lakeflow Spark Declarative Pipelines usa esse sequenciamento para manipular eventos de alteração que chegam fora de ordem. A coluna especificada deve ser um tipo de dados classificável. Você pode especificar:
  • Uma cadeia de caracteres: "sequenceNum"
  • Uma função Spark SQL col() : col("sequenceNum"). Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).
  • A struct() combinar várias colunas para resolver empates: struct("timestamp_col", "id_col"), ele ordenará primeiro pelo campo struct, depois pelo segundo campo se houver um empate, e assim por diante.
ignore_null_updates bool Permitir a importação de atualizações contendo um subconjunto das colunas de destino. Quando um evento CDC corresponde a uma linha existente e ignore_null_updates é True, as colunas com um null mantêm os seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null. Quando ignore_null_updates é False, os valores existentes são substituídos por null valores.
A predefinição é False.
apply_as_deletes str ou expr() Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um upsert. Você pode especificar:
  • Uma cadeia de caracteres: "Operation = 'DELETE'"
  • Uma função Spark SQL expr() : expr("Operation = 'DELETE'")

Para lidar com dados fora de ordem, a linha excluída é temporariamente mantida como uma marca de exclusão na tabela Delta subjacente e uma exibição é criada no metastore que filtra essas lápides. O intervalo de retenção é definido por padrão para dois dias e pode ser configurado com a propriedade da tabela pipelines.cdc.tombstoneGCThresholdInSeconds.
apply_as_truncates str ou expr() Especifica quando um evento CDC deve ser tratado como uma tabela TRUNCATEcompleta. Você pode especificar:
  • Uma cadeia de caracteres: "Operation = 'TRUNCATE'"
  • Uma função Spark SQL expr() : expr("Operation = 'TRUNCATE'")

Como essa cláusula aciona um truncado completo da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade. O apply_as_truncates parâmetro é suportado apenas para SCD tipo 1. SCD tipo 2 não suporta operações de truncamento.
column_list ou except_column_list list Um subconjunto de colunas a serem incluídas na tabela de destino. Use column_list para especificar a lista completa de colunas a serem incluídas. Use except_column_list para especificar as colunas a serem excluídas. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções do Spark SQL col() :
  • column_list = ["userId", "name", "city"]
  • column_list = [col("userId"), col("name"), col("city")]
  • except_column_list = ["operation", "sequenceNum"]
  • except_column_list = [col("operation"), col("sequenceNum")

Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento column_list ou except_column_list é passado para a função.
stored_as_scd_type str ou int Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2. Definido como 1 para SCD tipo 1 ou 2 para SCD tipo 2. O padrão é SCD tipo 1.
track_history_column_list ou track_history_except_column_list list Um subconjunto de colunas de saída a serem acompanhadas para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Use track_history_except_column_list para especificar as colunas a serem excluídas do rastreamento. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções do Spark SQL col() :
  • track_history_column_list = ["userId", "name", "city"]
  • track_history_column_list = [col("userId"), col("name"), col("city")]
  • track_history_except_column_list = ["operation", "sequenceNum"]
  • track_history_except_column_list = [col("operation"), col("sequenceNum")

Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento track_history_column_list ou track_history_except_column_list é passado para a função.
name str O nome do fluxo. Se não for fornecido, será usado o mesmo valor que target.
once bool Opcionalmente, defina o fluxo como um fluxo único, como um backfill. O uso once=True altera o fluxo de duas maneiras:
  • O valor de retorno. streaming-query. deve ser um DataFrame em lote neste caso, não um DataFrame de streaming.
  • O fluxo é executado uma vez por padrão. Se o pipeline receber uma atualização completa, o fluxo ONCE será executado novamente para recriar os dados.