Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
A create_auto_cdc_flow() função cria um fluxo que usa a funcionalidade CDC (captura de dados de mudanças) do Lakeflow Spark Declarative Pipelines para processar dados de origem de um feed de dados de alteração (CDF).
Observação
Esta função substitui a função apply_changes()anterior. As duas funções têm a mesma assinatura. O Databricks recomenda a atualização para usar o novo nome.
Importante
Você deve declarar uma tabela de streaming de destino para aplicar as alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o create_auto_cdc_flow() esquema da tabela de destino, você deve incluir as __START_AT colunas e __END_AT com o mesmo tipo de dados que os sequence_by campos.
Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() na interface Python do pipeline.
Sintaxe
from pyspark import pipelines as dp
dp.create_auto_cdc_flow(
target = "<target-table>",
source = "<data-source>",
keys = ["key1", "key2", "keyN"],
sequence_by = "<sequence-column>",
ignore_null_updates = <bool>,
apply_as_deletes = None,
apply_as_truncates = None,
column_list = None,
except_column_list = None,
stored_as_scd_type = <type>,
track_history_column_list = None,
track_history_except_column_list = None,
name = None,
once = <bool>
)
Para create_auto_cdc_flow processamento, o comportamento padrão para INSERT eventos e UPDATE é atualizar eventos CDC da origem: atualizar quaisquer linhas na tabela de destino que correspondam à(s) chave(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de DELETE eventos pode ser especificada com o apply_as_deletes parâmetro.
Para saber mais sobre o processamento de CDC com um feed de alterações, consulte As APIs AUTO CDC: Simplifique a captura de dados de alterações com pipelines. Para obter um exemplo de uso da create_auto_cdc_flow() função, consulte Exemplo: processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF.
Parâmetros
| Parâmetro | Tipo | Description |
|---|---|---|
target |
str |
Required. O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a create_auto_cdc_flow() função. |
source |
str |
Required. A fonte de dados que contém os registros do CDC. |
keys |
list |
Required. A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos CDC se aplicam a registros específicos na tabela de destino. Você pode especificar:
|
sequence_by |
str, col() ou struct() |
Required. Os nomes das colunas que especificam a ordem lógica dos eventos CDC nos dados fonte. O Lakeflow Spark Declarative Pipelines usa esse sequenciamento para manipular eventos de alteração que chegam fora de ordem. A coluna especificada deve ser um tipo de dados classificável. Você pode especificar:
|
ignore_null_updates |
bool |
Permitir a importação de atualizações contendo um subconjunto das colunas de destino. Quando um evento CDC corresponde a uma linha existente e ignore_null_updates é True, as colunas com um null mantêm os seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null. Quando ignore_null_updates é False, os valores existentes são substituídos por null valores.A predefinição é False. |
apply_as_deletes |
str ou expr() |
Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um upsert. Você pode especificar:
Para lidar com dados fora de ordem, a linha excluída é temporariamente mantida como uma marca de exclusão na tabela Delta subjacente e uma exibição é criada no metastore que filtra essas lápides. O intervalo de retenção é definido por padrão para dois dias e pode ser configurado com a propriedade da tabela pipelines.cdc.tombstoneGCThresholdInSeconds. |
apply_as_truncates |
str ou expr() |
Especifica quando um evento CDC deve ser tratado como uma tabela TRUNCATEcompleta. Você pode especificar:
Como essa cláusula aciona um truncado completo da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade. O apply_as_truncates parâmetro é suportado apenas para SCD tipo 1. SCD tipo 2 não suporta operações de truncamento. |
column_list ou except_column_list |
list |
Um subconjunto de colunas a serem incluídas na tabela de destino. Use column_list para especificar a lista completa de colunas a serem incluídas. Use except_column_list para especificar as colunas a serem excluídas. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções do Spark SQL col() :
Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento column_list ou except_column_list é passado para a função. |
stored_as_scd_type |
str ou int |
Se deseja armazenar registros como SCD tipo 1 ou SCD tipo 2. Definido como 1 para SCD tipo 1 ou 2 para SCD tipo 2. O padrão é SCD tipo 1. |
track_history_column_list ou track_history_except_column_list |
list |
Um subconjunto de colunas de saída a serem acompanhadas para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Use track_history_except_column_list para especificar as colunas a serem excluídas do rastreamento. Você pode declarar qualquer valor como uma lista de cadeias de caracteres ou como funções do Spark SQL col() :
Os argumentos para col() funções não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId). O padrão é incluir todas as colunas na tabela de destino quando nenhum argumento track_history_column_list ou track_history_except_column_list é passado para a função. |
name |
str |
O nome do fluxo. Se não for fornecido, será usado o mesmo valor que target. |
once |
bool |
Opcionalmente, defina o fluxo como um fluxo único, como um backfill. O uso once=True altera o fluxo de duas maneiras:
|