Referência da linguagem Python de Delta Live Tables

Este artigo fornece detalhes da interface de programação Python do Delta Live Tables.

Para saber mais sobre a API do SQL, confira a Referência da linguagem SQL de Delta Live Tables.

Para obter detalhes específicos sobre a configuração do carregador automático, consulte O que é o carregador automático?.

Antes de começar

Veja a seguir considerações importantes ao implementar pipelines com a interface do Python do Delta Live Tables:

  • Como as funções table() e view() do Python são invocadas várias vezes durante o planejamento e a execução de uma atualização de pipeline, não inclua código em uma dessas funções que possa ter efeitos colaterais (por exemplo, código que modifica dados ou envia um email). Para evitar um comportamento inesperado, as funções do Python que definem conjuntos de dados devem incluir apenas o código necessário para definir a tabela ou exibição.
  • Para executar operações como o envio de emails ou a integração com um serviço de monitoramento externo, especialmente em funções que definem conjuntos de dados, use ganchos de evento. Implementar essas operações nas funções que definem seus conjuntos de dados causará um comportamento inesperado.
  • As funções table e view de Python devem retornar um DataFrame. Algumas funções que operam em DataFrames não retornam DataFrames e não devem ser usadas. Essas operações incluem funções como collect(), count(), toPandas(), save() e saveAsTable(). Como as transformações de DataFrame são executadas depois que o grafo de fluxo de dados completo é resolvido, o uso dessas operações pode ter efeitos colaterais não intencionais.

Importar o módulo Python dlt

As funções de Delta Live Tables do Python são definidas no módulo dlt. Seus pipelines implementados com a API do Python devem importar esse módulo:

import dlt

Criar uma exibição materializada Delta Live Tables ou tabela de streaming

No Python, Delta Live Tables determina se um conjunto de dados deve ser atualizado como uma exibição materializada ou uma tabela de streaming com base na consulta definidora. O decorador @table pode ser usado para definir exibições materializadas e tabelas de streaming.

Para definir uma exibição materializada em Python, aplique @table a uma consulta que executa uma leitura estática em uma fonte de dados. Para definir uma tabela de streaming, aplique @table a uma consulta que execute uma leitura de streaming em uma fonte de dados ou use a função create_streaming_table(). Ambos os tipos de conjunto de dados têm a mesma especificação de sintaxe da seguinte maneira:

Observação

Para usar o argumento cluster_by para habilitar o clustering líquido, o pipeline deve ser configurado para usar o canal de visualização.

import dlt

@dlt.table(
  name="<name>",
  comment="<comment>",
  spark_conf={"<key>" : "<value>", "<key>" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  path="<storage-location-path>",
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  schema="schema-definition",
  row_filter = "row-filter-clause",
  temporary=False)
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Criar um modo de exibição Delta Live Tables

Para definir uma exibição em Python, aplique o decorador @view. Como o decorador @table, você pode usar exibições em Delta Live Tables para conjuntos de dados estáticos ou de streaming. A seguir está a sintaxe para definir modos de exibição com Python:

import dlt

@dlt.view(
  name="<name>",
  comment="<comment>")
@dlt.expect
@dlt.expect_or_fail
@dlt.expect_or_drop
@dlt.expect_all
@dlt.expect_all_or_drop
@dlt.expect_all_or_fail
def <function-name>():
    return (<query>)

Exemplo: definir tabelas e exibições

Para definir uma tabela ou exibição em Python, aplique o decorador @dlt.view ou @dlt.table a uma função. Você pode usar o nome da função ou o parâmetro name para atribuir o nome da tabela ou da exibição. O exemplo a seguir define dois conjuntos de dados diferentes: uma exibição chamada taxi_raw que usa um arquivo JSON como a fonte de entrada e uma tabela chamada filtered_data que usa a exibição taxi_raw como entrada:

import dlt

@dlt.view
def taxi_raw():
  return spark.read.format("json").load("/databricks-datasets/nyctaxi/sample/json/")

# Use the function name as the table name
@dlt.table
def filtered_data():
  return spark.read.table("LIVE.taxi_raw").where(...)

# Use the name parameter as the table name
@dlt.table(
  name="filtered_data")
def create_filtered_data():
  return spark.read.table("LIVE.taxi_raw").where(...)

Exemplo: acessar um conjunto de dados definido no mesmo pipeline

Observação

Embora as dlt.read() funções and dlt.read_stream() ainda estejam disponíveis e totalmente compatíveis com a interface Python do Delta Live Tables, o Databricks recomenda sempre usar as spark.read.table() funções and spark.readStream.table() devido ao seguinte:

  • As spark funções dão suporte à leitura de conjuntos de dados internos e externos, incluindo conjuntos de dados no armazenamento externo ou definidos em outros pipelines. As dlt funções dão suporte apenas à leitura de conjuntos de dados internos.
  • As spark funções dão suporte à especificação de opções, como skipChangeCommits, para ler operações. A especificação de opções não é suportada dlt pelas funções.

Para acessar um conjunto de dados definido no mesmo pipeline, use as spark.read.table() funções or spark.readStream.table() , acrescentando a LIVE palavra-chave ao nome do conjunto de dados:

@dlt.table
def customers_raw():
  return spark.read.format("csv").load("/data/customers.csv")

@dlt.table
def customers_filteredA():
  return spark.read.table("LIVE.customers_raw").where(...)

Exemplo: ler de uma tabela registrada em um metastore

Para ler os dados de uma tabela registrada no metastore do Hive, no argumento da função, omita a palavra-chave LIVE e, opcionalmente, qualifique o nome da tabela com o nome do banco de dados:

@dlt.table
def customers():
  return spark.read.table("sales.customers").where(...)

Para obter um exemplo de leitura de uma tabela do Catálogo Unity, consulte Ingerir dados em um pipeline do Catálogo Unity.

Exemplo: acessar um conjunto de dados usando spark.sql

Também é possível retornar um conjunto de dados usando uma expressão spark.sql em uma função de consulta. Para ler um conjunto de um conjunto de dados interno, inclua LIVE. no início do nome do conjunto de dados:

@dlt.table
def chicago_customers():
  return spark.sql("SELECT * FROM LIVE.customers_cleaned WHERE city = 'Chicago'")

Crie uma tabela para usar como destino de operações de streaming

Use a função create_streaming_table() para criar uma tabela de destino para a saída de registros por operações de streaming, inclusive registros de saída apply_changes(), apply_changes_from_snapshot() e @append_flow.

Observação

As funções create_target_table() e create_streaming_live_table() foram preteridas. O Databricks recomenda atualizar o código existente para usar a função create_streaming_table().

Observação

Para usar o argumento cluster_by para habilitar o clustering líquido, o pipeline deve ser configurado para usar o canal de visualização.

create_streaming_table(
  name = "<table-name>",
  comment = "<comment>"
  spark_conf={"<key>" : "<value", "<key" : "<value>"},
  table_properties={"<key>" : "<value>", "<key>" : "<value>"},
  partition_cols=["<partition-column>", "<partition-column>"],
  cluster_by = ["<clustering-column>", "<clustering-column>"],
  path="<storage-location-path>",
  schema="schema-definition",
  expect_all = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_drop = {"<key>" : "<value", "<key" : "<value>"},
  expect_all_or_fail = {"<key>" : "<value", "<key" : "<value>"},
  row_filter = "row-filter-clause"
)
Argumentos
name

Digite: str

O nome da tabela.

Este parâmetro é obrigatório.
comment

Digite: str

Uma descrição opcional para a tabela.
spark_conf

Digite: dict

Uma lista opcional de configurações do Spark para a execução dessa consulta.
table_properties

Digite: dict

Uma lista opcional de propriedades da tabela para a tabela.
partition_cols

Digite: array

Uma lista opcional de uma ou mais colunas a serem usadas para particionar a tabela.
cluster_by

Digite: array

Opcionalmente, habilite o clustering líquido na tabela e defina as colunas a serem usadas como chaves de clustering.

Confira Usar clustering líquido para tabelas Delta.
path

Digite: str

Um local de armazenamento opcional para os dados da tabela. Se não for definido, o sistema usará como padrão o local de armazenamento do pipeline.
schema

Tipo: str ou StructType

Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como uma cadeia de caracteres DDL do SQL ou com um Python
StructType.
expect_all
expect_all_or_drop
expect_all_or_fail

Digite: dict

Restrições opcionais de qualidade de dados para a tabela. Confira Várias expectativas.
row_filter (Versão prévia pública)

Digite: str

Uma cláusula de filtro de linha opcional para a tabela. Confira Publicar tabelas com filtros de linha e máscaras de coluna.

Controlar como as tabelas são materializadas

As tabelas também oferecem controle adicional da materialização:

Observação

Para tabelas com menos de 1 TB de tamanho, o Databricks recomenda permitir que o Delta Live Tables controle a organização de dados. Você não deve especificar colunas de partição, a menos que espere que sua tabela cresça além de um terabyte.

Exemplo: especificar um esquema e colunas de partição

Opcionalmente, você pode especificar um esquema de tabela usando um Python StructType ou uma cadeia de caracteres DDL de SQL. Quando especificada com uma cadeia de caracteres DDL, a definição pode incluir colunas geradas.

Os exemplos a seguir criam uma tabela chamada sales com um esquema especificado usando um StructType do Python:

sales_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("number_of_line_items", StringType(), True),
  StructField("order_datetime", StringType(), True),
  StructField("order_number", LongType(), True)]
)

@dlt.table(
  comment="Raw data on sales",
  schema=sales_schema)
def sales():
  return ("...")

O exemplo a seguir especifica o esquema de uma tabela usando uma cadeia de caracteres DDL, define uma coluna gerada e define uma coluna de partição:

@dlt.table(
  comment="Raw data on sales",
  schema="""
    customer_id STRING,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime))
    """,
  partition_cols = ["order_day_of_week"])
def sales():
  return ("...")

Por padrão, Delta Live Tables inferem o esquema da definição table, se você não especificar um esquema.

Configurar uma tabela de streaming para ignorar alterações em uma tabela de streaming de origem

Observação

  • O sinalizador skipChangeCommits funciona apenas com spark.readStream usando a função option(). Você não pode usar esse sinalizador em uma função dlt.read_stream().
  • Não é possível usar o sinalizador skipChangeCommits quando a tabela de streaming de origem está definida como destino de uma função apply_changes().

Por padrão, as tabelas de streaming exigem fontes somente acréscimo. Quando uma tabela de streaming usa outra tabela de streaming como origem, e a tabela de streaming de origem exige atualizações ou exclusões, por exemplo, o processamento do "direito de ser esquecido" do GDPR, o sinalizador skipChangeCommits pode ser definido durante a leitura da tabela de streaming de origem para ignorar essas alterações. Para obter mais informações sobre esse sinalizador, consulte Ignorar atualizações e exclusões.

@table
def b():
   return spark.readStream.option("skipChangeCommits", "true").table("LIVE.A")

Exemplo: definir as restrições de tabela

Importante

As restrições de tabela estão em Visualização Pública.

Ao especificar um esquema, você pode definir as chaves primárias e estrangeiras. As restrições são informativas e não são impostas. Consulte a cláusula CONSTRAINT na referência da linguagem SQL.

O exemplo a seguir define uma tabela com uma restrição de chave primária e estrangeira:

@dlt.table(
   schema="""
    customer_id STRING NOT NULL PRIMARY KEY,
    customer_name STRING,
    number_of_line_items STRING,
    order_datetime STRING,
    order_number LONG,
    order_day_of_week STRING GENERATED ALWAYS AS (dayofweek(order_datetime)),
    CONSTRAINT fk_customer_id FOREIGN KEY (customer_id) REFERENCES main.default.customers(customer_id)
    """
def sales():
   return ("...")

Exemplo: Definir um filtro de linha e uma máscara de coluna

Importante

Os filtros de linha e as máscaras de coluna estão em Visualização Pública.

Para criar uma exibição materializada ou tabela de streaming com um filtro de linha e uma máscara de coluna, use a cláusula ROW FILTER e a cláusula MASK. O exemplo a seguir demonstra como definir uma exibição materializada e uma tabela de streaming com um filtro de linha e uma máscara de coluna:

@dlt.table(
   schema="""
    id int COMMENT 'This is the customer ID',
    name string COMMENT 'This is the customer full name',
    region string,
    ssn string MASK catalog.schema.ssn_mask_fn USING COLUMNS (region)
    """,
  row_filter = "ROW FILTER catalog.schema.us_filter_fn ON (region, name)"
def sales():
   return ("...")

Para obter mais informações sobre filtros de linha e máscaras de coluna, consulte Publicar tabelas com filtros de linha e máscaras de coluna.

Propriedades do Python Delta Live Tables

As tabelas a seguir descrevem as opções e propriedades que você pode especificar ao definir tabelas e exibições com o Delta Live Tables:

Observação

Para usar o argumento cluster_by para habilitar o clustering líquido, o pipeline deve ser configurado para usar o canal de visualização.

@table ou @view
name

Digite: str

Um nome opcional para a tabela ou exibição. Se não estiver definido, o nome da função será usado como o nome da tabela ou da exibição.
comment

Digite: str

Uma descrição opcional para a tabela.
spark_conf

Digite: dict

Uma lista opcional de configurações do Spark para a execução dessa consulta.
table_properties

Digite: dict

Uma lista opcional de propriedades da tabela para a tabela.
path

Digite: str

Um local de armazenamento opcional para os dados da tabela. Se não for definido, o sistema usará como padrão o local de armazenamento do pipeline.
partition_cols

Digite: a collection of str

Uma coleção opcional, por exemplo, um list de uma ou mais colunas a serem usadas para particionar a tabela.
cluster_by

Digite: array

Opcionalmente, habilite o clustering líquido na tabela e defina as colunas a serem usadas como chaves de clustering.

Confira Usar clustering líquido para tabelas Delta.
schema

Tipo: str ou StructType

Uma definição de esquema opcional para a tabela. Os esquemas podem ser definidos como uma cadeia de caracteres DDL do SQL ou com um Python StructType.
temporary

Digite: bool

Crie uma tabela, mas não publique metadados para a tabela. A palavra-chave temporary instrui as Tabelas Dinâmicas Delta a criar uma tabela que fique disponível para o pipeline, mas não deve ser acessada fora do pipeline. Para reduzir o tempo de processamento, uma tabela temporária permanece por toda a duração do pipeline que a cria e não apenas para uma única atualização.

O padrão é "False".
row_filter (Versão prévia pública)

Digite: str

Uma cláusula de filtro de linha opcional para a tabela. Confira Publicar tabelas com filtros de linha e máscaras de coluna.
Definição de tabela ou exibição
def <function-name>()

Uma função Python que define o conjunto de dados. Se o parâmetro name não for definido, <function-name> será usado como o nome do conjunto de dados de destino.
query

Uma instrução SQL do Spark que retorna um conjunto de dados Spark ou um DataFrame do Koalas.

Use dlt.read() ou spark.read.table() para executar uma leitura completa de um conjunto de dados definido no mesmo pipeline. Para ler um conjunto de dados externo, use a spark.read.table() função. Você não pode usar dlt.read() para ler conjuntos de dados externos. Como spark.read.table() pode ser usado para ler conjuntos de dados internos, conjuntos de dados definidos fora do pipeline atual e permite que você especifique opções para leitura de dados, o Databricks recomenda usá-lo em vez da dlt.read() função.

Ao usar a spark.read.table() função para ler de um conjunto de dados definido no mesmo pipeline, anexe a LIVE palavra-chave ao nome do conjunto de dados no argumento da função. Por exemplo, para ler de um conjunto de dados chamado customers:

spark.read.table("LIVE.customers")

Você também pode usar a função spark.read.table() para ler de uma tabela registrada no metastore omitindo a palavra-chave LIVE e, opcionalmente, qualificando o nome da tabela com o nome do banco de dados:

spark.read.table("sales.customers")

Use dlt.read_stream() ou spark.readStream.table() para executar uma leitura de streaming de um conjunto de dados definido no mesmo pipeline. Para executar uma leitura de streaming de um conjunto de dados externo, use o
spark.readStream.table() função. Como spark.readStream.table() pode ser usado para ler conjuntos de dados internos, conjuntos de dados definidos fora do pipeline atual e permite que você especifique opções para leitura de dados, o Databricks recomenda usá-lo em vez da dlt.read_stream() função.

Para definir uma consulta em uma função Delta Live Tables table usando a sintaxe SQL, use a spark.sql função. Consulte Exemplo: acessar um conjunto de dados usando spark.sql. Para definir uma consulta em uma função Delta Live Tables table usando Python, use a sintaxe do PySpark .
Expectativas
@expect("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por
description. Se uma linha violar a expectativa, inclua a linha no conjunto de dados de destino.
@expect_or_drop("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por
description. Se uma linha violar a expectativa, exclua a linha do conjunto de dados de destino.
@expect_or_fail("description", "constraint")

Declarar uma restrição de qualidade de dados identificada por
description. Se uma linha violar a expectativa, pare imediatamente a execução.
@expect_all(expectations)

Declare uma ou mais restrições de qualidade de dados.
expectations é um dicionário Python, em que a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar qualquer uma das expectativas, inclua a linha no conjunto de dados de destino.
@expect_all_or_drop(expectations)

Declare uma ou mais restrições de qualidade de dados.
expectations é um dicionário Python, em que a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar alguma expectativa, exclua a linha do conjunto de dados de destino.
@expect_all_or_fail(expectations)

Declare uma ou mais restrições de qualidade de dados.
expectations é um dicionário Python, em que a chave é a descrição da expectativa e o valor é a restrição de expectativa. Se uma linha violar alguma expectativa, pare imediatamente a execução.

Captura de dados das alterações a partir de um feed de alterações com Python no Delta Live Tables

Use a função apply_changes() na API do Python para usar a funcionalidade de captura de dados de alteração (CDC) do Delta Live Tables para processar os dados de origem de um feed de dados de alteração (CDF).

Importante

Você deve declarar uma tabela de streaming de destino para aplicar alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino apply_changes(), você deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados que os campos sequence_by.

Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() na interface do Python do Delta Live Tables.

apply_changes(
  target = "<target-table>",
  source = "<data-source>",
  keys = ["key1", "key2", "keyN"],
  sequence_by = "<sequence-column>",
  ignore_null_updates = False,
  apply_as_deletes = None,
  apply_as_truncates = None,
  column_list = None,
  except_column_list = None,
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
)

Observação

Para o processamento de APPLY CHANGES, o comportamento padrão para eventos INSERT e UPDATE é upsert eventos CDC da origem: atualizar qualquer linha na tabela de destino que corresponda à(s) chave(s) especificada(s) ou inserir uma nova linha quando um registro correspondente não existir na tabela de destino. A manipulação de eventos DELETE pode ser especificada com a condição APPLY AS DELETE WHEN.

Para saber mais sobre o processamento de CDC com um feed de alterações, consulte As APIs APPLY CHANGES: Simplificar a captura de dados de alterações com o Delta Live Tables. Para obter um exemplo de uso da função apply_changes(), consulte Exemplo: Processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF.

Importante

Você deve declarar uma tabela de streaming de destino para aplicar alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela de destino apply_changes, você deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados que o campo sequence_by.

Consulte As APIs APPLY CHANGES: Simplifique a captura de dados de alterações com o Delta Live Tables.

Argumentos
target

Digite: str

O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função apply_changes().

Este parâmetro é necessário.
source

Digite: str

A fonte de dados que contém registros de CDC.

Este parâmetro é obrigatório.
keys

Digite: list

A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos de CDC se aplicam a registros específicos na tabela de destino.

Você pode especificar:

- Uma lista de cadeias de caracteres: ["userId", "orderId"]
- Uma lista de funções SQL col() do Spark: [col("userId"), col("orderId"]

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é obrigatório.
sequence_by

Tipo: str ou col()

O nome da coluna que especifica a ordem lógica dos eventos de CDC nos dados de origem. As tabelas Delta ao vivo usam esse sequenciamento para lidar com eventos de alteração que chegam fora de ordem.

Você pode especificar:

- Uma cadeia de caracteres: "sequenceNum"
- Uma função de SQL col() do Spark: col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

A coluna especificada deve ser um tipo de dados classificável.

Este parâmetro é obrigatório.
ignore_null_updates

Digite: bool

Permitir a ingestão de atualizações contendo um subconjunto das colunas de destino. Quando um evento CDC corresponde a uma linha existente e ignore_null_updates é True, as colunas com null mantêm seus valores existentes no destino. Isso também se aplica a colunas aninhadas com um valor de null. Quando ignore_null_updates é False, os valores existentes são substituídos por valores null.

Esse parâmetro é opcional.

O padrão é False.
apply_as_deletes

Tipo: str ou expr()

Especifica quando um evento CDC deve ser tratado como um DELETE em vez de um Upsert. Para lidar com dados fora de ordem, a linha excluída é temporariamente retida como uma marca para exclusão na tabela Delta subjacente e uma exibição é criada no metastore que filtra essas marcas de exclusão. O intervalo de retenção pode ser configurado com o
pipelines.cdc.tombstoneGCThresholdInSeconds table propriedade.

Você pode especificar:

- Uma cadeia de caracteres: "Operation = 'DELETE'"
- Uma função de SQL expr() do Spark: expr("Operation = 'DELETE'")

Esse parâmetro é opcional.
apply_as_truncates

Tipo: str ou expr()

Especifica quando um evento CDC deve ser tratado como uma tabela TRUNCATE completa. Como essa cláusula dispara um truncado completo da tabela de destino, ela deve ser usada apenas para casos de uso específicos que exijam essa funcionalidade.

O parâmetro apply_as_truncates tem suporte apenas para SCD tipo 1. O SCD tipo 2 não dá suporte a operações de truncamento.

Você pode especificar:

- Uma cadeia de caracteres: "Operation = 'TRUNCATE'"
- Uma função de SQL expr() do Spark: expr("Operation = 'TRUNCATE'")

Esse parâmetro é opcional.
column_list

except_column_list

Digite: list

Um subconjunto de colunas a ser incluído na tabela de destino. Use column_list para especificar a lista completa de colunas a serem incluídas. Use except_column_list para especificar as colunas a serem excluídas. É possível declarar um valor como uma lista de cadeias de caracteres ou como funções de SQL col() do Spark:

- column_list = ["userId", "name", "city"].
- column_list = [col("userId"), col("name"), col("city")]
- except_column_list = ["operation", "sequenceNum"]
- except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Esse parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando nenhum column_list ou argumento except_column_list for passado para a função.
stored_as_scd_type

Tipo: str ou int

Se é necessário armazenar registros como SCD tipo 1 ou SCD tipo 2.

Defina como 1 para o SCD tipo 1 ou como 2 para o SCD tipo 2.

Esta cláusula é opcional.

O padrão é SCD tipo 1.
track_history_column_list

track_history_except_column_list

Digite: list

Um subconjunto de colunas de saída a ser rastreado para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Uso
track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. É possível declarar um valor como uma lista de cadeias de caracteres ou como funções de SQL col() do Spark:
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Esse parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando os argumentos track_history_column_list
e track_history_except_column_list não são passados para a função.

Alterar a captura de dados de instantâneos do banco de dados com Python no Delta Live Tables

Importante

A API APPLY CHANGES FROM SNAPSHOT está em Visualização Pública.

Use a função apply_changes_from_snapshot() na API do Python para usar a funcionalidade de captura de dados de alteração (CDC) do Delta Live Tables para processar os dados de origem de instantâneos de banco de dados.

Importante

Você deve declarar uma tabela de streaming de destino para aplicar alterações. Opcionalmente, você pode especificar o esquema para sua tabela de destino. Ao especificar o esquema da tabela apply_changes_from_snapshot() de destino, você também deve incluir as colunas __START_AT e __END_AT com o mesmo tipo de dados que o campo sequence_by.

Para criar a tabela de destino necessária, você pode usar a função create_streaming_table() na interface do Python do Delta Live Tables.

apply_changes_from_snapshot(
  target = "<target-table>",
  source = Any,
  keys = ["key1", "key2", "keyN"],
  stored_as_scd_type = <type>,
  track_history_column_list = None,
  track_history_except_column_list = None
) -> None

Observação

Para o processamento APPLY CHANGES FROM SNAPSHOT, o comportamento padrão é inserir uma nova linha quando um registro correspondente com a(s) mesma(s) chave(s) não existir no destino. Se houver um registro correspondente, ele será atualizado somente se algum dos valores da linha tiver sido alterado. As linhas com chaves presentes no destino, mas que não estão mais presentes na origem, são excluídas.

Para saber mais sobre o processamento de CDC com instantâneos, consulte As APIs APPLY CHANGES: Simplificar a captura de dados de alterações com o Delta Live Tables. Para exemplos de uso da função apply_changes_from_snapshot(), consulte os exemplos de ingestão de instantâneos periódicos e ingestão de instantâneos históricos.

Argumentos
target

Digite: str

O nome da tabela a ser atualizada. Você pode usar a função create_streaming_table() para criar a tabela de destino antes de executar a função apply_changes().

Este parâmetro é obrigatório.
source

Tipo: str ou lambda function

O nome de uma tabela ou exibição a ser capturada periodicamente ou uma função lambda Python que retorna o DataFrame do instantâneo a ser processado e a versão do instantâneo. Consulte Implementar o argumento de origem.

Este parâmetro é obrigatório.
keys

Digite: list

A coluna ou combinação de colunas que identifica exclusivamente uma linha nos dados de origem. Isso é usado para identificar quais eventos de CDC se aplicam a registros específicos na tabela de destino.

Você pode especificar:

- Uma lista de cadeias de caracteres: ["userId", "orderId"]
- Uma lista de funções SQL col() do Spark: [col("userId"), col("orderId"]

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Este parâmetro é obrigatório.
stored_as_scd_type

Tipo: str ou int

Se é necessário armazenar registros como SCD tipo 1 ou SCD tipo 2.

Defina como 1 para o SCD tipo 1 ou como 2 para o SCD tipo 2.

Esta cláusula é opcional.

O padrão é SCD tipo 1.
track_history_column_list

track_history_except_column_list

Digite: list

Um subconjunto de colunas de saída a ser rastreado para o histórico na tabela de destino. Use track_history_column_list para especificar a lista completa de colunas a serem rastreadas. Uso
track_history_except_column_list para especificar as colunas a serem excluídas do acompanhamento. É possível declarar um valor como uma lista de cadeias de caracteres ou como funções de SQL col() do Spark:
- track_history_column_list = ["userId", "name", "city"].
- track_history_column_list = [col("userId"), col("name"), col("city")]
- track_history_except_column_list = ["operation", "sequenceNum"]
- track_history_except_column_list = [col("operation"), col("sequenceNum")

Argumentos para funções col() não podem incluir qualificadores. Por exemplo, você pode usar col(userId), mas não pode usar col(source.userId).

Esse parâmetro é opcional.

O padrão é incluir todas as colunas na tabela de destino quando os argumentos track_history_column_list
e track_history_except_column_list não são passados para a função.

Implementar o argumento source

A função apply_changes_from_snapshot() inclui o argumento source. Para processar instantâneos históricos, espera-se que o argumento source seja uma função lambda Python que retorne dois valores para a função apply_changes_from_snapshot(): um DataFrame Python contendo os dados do instantâneo a ser processado e uma versão do instantâneo.

A seguir, a assinatura da função lambda:

lambda Any => Optional[(DataFrame, Any)]
  • O argumento para a função lambda é a versão do instantâneo processada mais recentemente.
  • O valor de retorno da função lambda é None ou uma tupla de dois valores: O primeiro valor da tupla é um DataFrame que contém o instantâneo a ser processado. O segundo valor da tupla é a versão do instantâneo que representa a ordem lógica do instantâneo.

Um exemplo que implementa e chama a função lambda:

def next_snapshot_and_version(latest_snapshot_version):
 if latest_snapshot_version is None:
   return (spark.read.load("filename.csv"), 1)
 else:
   return None

apply_changes_from_snapshot(
  # ...
  source = next_snapshot_and_version,
  # ...
)

O runtime do Delta Live Tables executa as seguintes etapas cada vez que o pipeline que contém a função apply_changes_from_snapshot() é disparado:

  1. Executa a função next_snapshot_and_version para carregar o DataFrame do próximo instantâneo e a versão do instantâneo correspondente.
  2. Se nenhum DataFrame retornar, a execução será encerrada e a atualização do pipeline será marcada como concluída.
  3. Detecta as alterações no novo instantâneo e as aplica de forma incremental à tabela de destino.
  4. Retorna à etapa 1 para carregar o próximo instantâneo e sua versão.

Limitações

A interface do Python do Delta Live Tables tem a seguinte limitação:

Não há suporte para a função pivot(). A operação pivot no Spark requer o carregamento adiantado de dados de entrada para calcular o esquema da saída. Não há suporte para essa funcionalidade no Delta Live Tables.