Partilhar via


As APIs APPLY CHANGES: simplifique a captura de dados de alteração com o Delta Live Tables

O Delta Live Tables simplifica a captura de dados de alteração (CDC) com as APPLY CHANGES APIs e APPLY CHANGES FROM SNAPSHOT . A interface que você usa depende da fonte de dados de alteração:

  • Use APPLY CHANGES para processar alterações de um feed de dados de alteração (CDF).
  • Use APPLY CHANGES FROM SNAPSHOT (Visualização pública) para processar alterações em instantâneos de banco de dados.

Anteriormente, a MERGE INTO instrução era comumente usada para processar registros CDC no Azure Databricks. No entanto, MERGE INTO pode produzir resultados incorretos devido a registros fora de sequência ou requer lógica complexa para reordenar registros.

A APPLY CHANGES API é suportada nas interfaces Delta Live Tables SQL e Python. A APPLY CHANGES FROM SNAPSHOT API é suportada na interface Python do Delta Live Tables.

Ambos e APPLY CHANGES APPLY CHANGES FROM SNAPSHOT suportam a atualização de tabelas usando SCD tipo 1 e tipo 2:

  • Use o SCD tipo 1 para atualizar registros diretamente. O histórico não é retido para registros atualizados.
  • Use o SCD tipo 2 para manter um histórico de registros, seja em todas as atualizações ou em atualizações de um conjunto especificado de colunas.

Para sintaxe e outras referências, consulte:

Nota

Este artigo descreve como atualizar tabelas no pipeline Delta Live Tables com base em alterações nos dados de origem. Para saber como registrar e consultar informações de alteração no nível da linha para tabelas Delta, consulte Usar feed de dados de alteração do Lago Delta no Azure Databricks.

Requisitos

Para usar as APIs CDC, seu pipeline deve ser configurado para usar pipelines DLT sem servidor ou as Delta Live Tables Pro ou Advanced edições.

Como o CDC é implementado com a APPLY CHANGES API?

Ao lidar automaticamente com registros fora de sequência, a APPLY CHANGES API em Delta Live Tables garante o processamento correto de registros CDC e elimina a necessidade de desenvolver uma lógica complexa para lidar com registros fora de sequência. Você deve especificar uma coluna nos dados de origem na qual sequenciar registros, que o Delta Live Tables interpreta como uma representação monotonicamente crescente da ordenação adequada dos dados de origem. O Delta Live Tables lida automaticamente com os dados que chegam fora de ordem. Para alterações do tipo 2 do SCD, o Delta Live Tables propaga os valores de sequenciamento apropriados __START_AT para as colunas e __END_AT tabelas de destino. Deve haver uma atualização distinta por chave em cada valor de sequenciamento, e os valores de sequenciamento NULL não são suportados.

Para executar o processamento CDC com APPLY CHANGESo , primeiro crie uma tabela de streaming e, em seguida, use a APPLY CHANGES INTO instrução em SQL ou a apply_changes() função em Python para especificar a origem, as chaves e o sequenciamento para o feed de alterações. Para criar a tabela de streaming de destino, use a CREATE OR REFRESH STREAMING TABLE instrução em SQL ou a create_streaming_table() função em Python. Consulte os exemplos de processamento de SCD tipo 1 e tipo 2.

Para obter detalhes de sintaxe, consulte a referência SQL do Delta Live Tables ou a referência do Python.

Como o CDC é implementado com a APPLY CHANGES FROM SNAPSHOT API?

Importante

A APPLY CHANGES FROM SNAPSHOT API está em Visualização Pública.

APPLY CHANGES FROM SNAPSHOT é uma API declarativa que determina eficientemente as alterações nos dados de origem comparando uma série de instantâneos em ordem e, em seguida, executa o processamento necessário para o processamento CDC dos registros nos instantâneos. APPLY CHANGES FROM SNAPSHOT é suportado apenas pela interface Python Delta Live Tables.

APPLY CHANGES FROM SNAPSHOT Suporta a ingestão de snapshots de vários tipos de origem:

  • Use a ingestão periódica de instantâneos para ingerir instantâneos de uma tabela ou exibição existente. APPLY CHANGES FROM SNAPSHOT tem uma interface simples e simplificada para suportar a ingestão periódica de instantâneos de um objeto de banco de dados existente. Um novo snapshot é ingerido com cada atualização de pipeline e o tempo de ingestão é usado como a versão do snapshot. Quando um pipeline é executado no modo contínuo, vários snapshots são ingeridos com cada atualização de pipeline em um período determinado pela configuração de intervalo de gatilho para o fluxo que contém o processamento APPLY CHANGES FROM SNAPSHOT.
  • Use a ingestão de instantâneos históricos para processar arquivos que contenham instantâneos de banco de dados, como instantâneos gerados a partir de um banco de dados Oracle ou MySQL ou de um data warehouse.

Para executar o processamento CDC de qualquer tipo de origem com APPLY CHANGES FROM SNAPSHOTo , primeiro crie uma tabela de streaming e, em seguida, use a apply_changes_from_snapshot() função em Python para especificar o instantâneo, as chaves e outros argumentos necessários para implementar o processamento. Veja os exemplos de ingestão periódica de instantâneos e ingestão de instantâneos históricos.

Os snapshots passados para a API devem estar em ordem crescente por versão. Se o Delta Live Tables detetar um instantâneo fora de ordem, um erro será lançado.

Para obter detalhes de sintaxe, consulte a referência Delta Live Tables Python.

Limitações

O destino de uma APPLY CHANGES consulta ou APPLY CHANGES FROM SNAPSHOT não pode ser usado como fonte para uma tabela de streaming. Uma tabela que lê a partir do destino de uma APPLY CHANGES consulta ou APPLY CHANGES FROM SNAPSHOT deve ser uma exibição materializada.

Exemplo: processamento de SCD tipo 1 e SCD tipo 2 com dados de origem CDF

As seções a seguir fornecem exemplos de consultas Delta Live Tables SCD tipo 1 e tipo 2 que atualizam tabelas de destino com base em eventos de origem de um feed de dados de alteração que:

  1. Cria novos registros de usuário.
  2. Exclui um registro de usuário.
  3. Atualiza os registros do usuário. No exemplo SCD tipo 1, as últimas UPDATE operações chegam atrasadas e são descartadas da tabela de destino, demonstrando a manipulação de eventos fora de ordem.

Os exemplos a seguir pressupõem familiaridade com a configuração e atualização de pipelines Delta Live Tables. Consulte Tutorial: Execute seu primeiro pipeline Delta Live Tables.

Para executar esses exemplos, você deve começar criando um conjunto de dados de exemplo. Consulte Gerar dados de teste.

A seguir estão os registros de entrada para esses exemplos:

ID de Utilizador nome cidade operation seqüênciaNum
124 Raul Oaxaca INSERT 1
123 Isabel Monterrey INSERT 1
125 Mercedes Tijuana INSERT 2
126 Lírio Cancún INSERT 2
123 nulo nulo DELETE 6
125 Mercedes Guadalajara ATUALIZAR 6
125 Mercedes Mexicali ATUALIZAR 5
123 Isabel Chihuahua ATUALIZAR 5

Se você descomentar a linha final nos dados de exemplo, ele inserirá o seguinte registro que especifica onde os registros devem ser truncados:

ID de Utilizador nome cidade operation seqüênciaNum
nulo nulo nulo TRUNCATE 3

Nota

Todos os exemplos a seguir incluem opções para especificar ambas e DELETE TRUNCATE operações, mas cada uma é opcional.

Processar atualizações do SCD tipo 1

O exemplo a seguir demonstra o processamento de atualizações do SCD tipo 1:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  apply_as_truncates = expr("operation = 'TRUNCATE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = 1
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
APPLY AS TRUNCATE WHEN
  operation = "TRUNCATE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 1;

Depois de executar o exemplo SCD tipo 1, a tabela de destino contém os seguintes registros:

ID de Utilizador nome cidade
124 Raul Oaxaca
125 Mercedes Guadalajara
126 Lírio Cancún

Depois de executar o exemplo SCD tipo 1 com o registro adicional TRUNCATE , os registros 124 são truncados devido 126 à TRUNCATE operação em sequenceNum=3, e a tabela de destino contém o seguinte registro:

ID de Utilizador nome cidade
125 Mercedes Guadalajara

Processar atualizações do tipo 2 do SCD

O exemplo a seguir demonstra o processamento de atualizações do tipo 2 do SCD:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2"
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2;

Depois de executar o exemplo SCD tipo 2, a tabela de destino contém os seguintes registros:

ID de Utilizador nome cidade __START_AT __END_AT
123 Isabel Monterrey 1 5
123 Isabel Chihuahua 5 6
124 Raul Oaxaca 1 nulo
125 Mercedes Tijuana 2 5
125 Mercedes Mexicali 5 6
125 Mercedes Guadalajara 6 nulo
126 Lírio Cancún 2 nulo

Uma consulta SCD tipo 2 também pode especificar um subconjunto de colunas de saída a serem rastreadas para o histórico na tabela de destino. As alterações em outras colunas são atualizadas no local, em vez de gerar novos registros de histórico. O exemplo a seguir demonstra a exclusão da city coluna do rastreamento:

O exemplo a seguir demonstra o uso do histórico de faixas com SCD tipo 2:

Python

import dlt
from pyspark.sql.functions import col, expr

@dlt.view
def users():
  return spark.readStream.table("cdc_data.users")

dlt.create_streaming_table("target")

dlt.apply_changes(
  target = "target",
  source = "users",
  keys = ["userId"],
  sequence_by = col("sequenceNum"),
  apply_as_deletes = expr("operation = 'DELETE'"),
  except_column_list = ["operation", "sequenceNum"],
  stored_as_scd_type = "2",
  track_history_except_column_list = ["city"]
)

SQL

-- Create and populate the target table.
CREATE OR REFRESH STREAMING TABLE target;

APPLY CHANGES INTO
  live.target
FROM
  stream(cdc_data.users)
KEYS
  (userId)
APPLY AS DELETE WHEN
  operation = "DELETE"
SEQUENCE BY
  sequenceNum
COLUMNS * EXCEPT
  (operation, sequenceNum)
STORED AS
  SCD TYPE 2
TRACK HISTORY ON * EXCEPT
  (city)

Depois de executar este exemplo sem o registro adicional TRUNCATE , a tabela de destino contém os seguintes registros:

ID de Utilizador nome cidade __START_AT __END_AT
123 Isabel Chihuahua 1 6
124 Raul Oaxaca 1 nulo
125 Mercedes Guadalajara 2 nulo
126 Lírio Cancún 2 nulo

Gerar dados de teste

O código abaixo é fornecido para gerar um conjunto de dados de exemplo para uso nas consultas de exemplo presentes neste tutorial. Supondo que você tenha as credenciais adequadas para criar um novo esquema e criar uma nova tabela, você pode executar essas instruções com um bloco de anotações ou Databricks SQL. O código a seguir não se destina a ser executado como parte de um pipeline Delta Live Tables:

CREATE SCHEMA IF NOT EXISTS cdc_data;

CREATE TABLE
  cdc_data.users
AS SELECT
  col1 AS userId,
  col2 AS name,
  col3 AS city,
  col4 AS operation,
  col5 AS sequenceNum
FROM (
  VALUES
  -- Initial load.
  (124, "Raul",     "Oaxaca",      "INSERT", 1),
  (123, "Isabel",   "Monterrey",   "INSERT", 1),
  -- New users.
  (125, "Mercedes", "Tijuana",     "INSERT", 2),
  (126, "Lily",     "Cancun",      "INSERT", 2),
  -- Isabel is removed from the system and Mercedes moved to Guadalajara.
  (123, null,       null,          "DELETE", 6),
  (125, "Mercedes", "Guadalajara", "UPDATE", 6),
  -- This batch of updates arrived out of order. The above batch at sequenceNum 5 will be the final state.
  (125, "Mercedes", "Mexicali",    "UPDATE", 5),
  (123, "Isabel",   "Chihuahua",   "UPDATE", 5)
  -- Uncomment to test TRUNCATE.
  -- ,(null, null,      null,          "TRUNCATE", 3)
);

Exemplo: processamento periódico de snapshot

O exemplo a seguir demonstra o processamento SCD tipo 2 que ingere instantâneos de uma tabela armazenada em mycatalog.myschema.mytable. Os resultados do processamento são gravados em uma tabela chamada target.

mycatalog.myschema.mytable registros no timestamp 2024-01-01 00:00:00

Key valor
1 a1
2 a2

mycatalog.myschema.mytable registros no timestamp 2024-01-01 12:00:00

Key valor
2 b2
3 a3
import dlt

@dlt.view(name="source")
def source():
 return spark.read.table("mycatalog.myschema.mytable")

dlt.create_streaming_table("target")

dlt.apply_changes_from_snapshot(
 target="target",
 source="source",
 keys=["key"],
 stored_as_scd_type=2
)

Depois de processar os instantâneos, a tabela de destino contém os seguintes registros:

Key valor __START_AT __END_AT
1 a1 2024-01-01 00:00:00 2024-01-01 12:00:00
2 a2 2024-01-01 00:00:00 2024-01-01 12:00:00
2 b2 2024-01-01 12:00:00 nulo
3 a3 2024-01-01 12:00:00 nulo

Exemplo: processamento de instantâneos históricos

O exemplo a seguir demonstra o processamento SCD tipo 2 que atualiza uma tabela de destino com base em eventos de origem de dois snapshots armazenados em um sistema de armazenamento em nuvem:

Instantâneo em timestamp, armazenado em /<PATH>/filename1.csv

Chave TrackingColumn NonTrackingColumn
1 a1 b1
2 a2 b2
4 a4 b4

Instantâneo em timestamp + 5, armazenado em /<PATH>/filename2.csv

Chave TrackingColumn NonTrackingColumn
2 a2_new b2
3 a3 b3
4 a4 b4_new

O exemplo de código a seguir demonstra o processamento de atualizações SCD tipo 2 com esses instantâneos:

import dlt

def exist(file_name):
  # Storage system-dependent function that returns true if file_name exists, false otherwise

# This function returns a tuple, where the first value is a DataFrame containing the snapshot
# records to process, and the second value is the snapshot version representing the logical
# order of the snapshot.
# Returns None if no snapshot exists.
def next_snapshot_and_version(latest_snapshot_version):
  latest_snapshot_version = latest_snapshot_version or 0
  next_version = latest_snapshot_version + 1
  file_name = "dir_path/filename_" + next_version + ".csv"
  if (exist(file_name)):
    return (spark.read.load(file_name), next_version)
   else:
     # No snapshot available
     return None

dlt.create_streaming_live_table("target")

dlt.apply_changes_from_snapshot(
  target = "target",
  source = next_snapshot_and_version,
  keys = ["Key"],
  stored_as_scd_type = 2,
  track_history_column_list = ["TrackingCol"]
)

Depois de processar os instantâneos, a tabela de destino contém os seguintes registros:

Chave TrackingColumn NonTrackingColumn __START_AT __END_AT
1 a1 b1 1 2
2 a2 b2 1 2
2 a2_new b2 2 nulo
3 a3 b3 2 nulo
4 a4 b4_new 1 nulo

Adicionar, alterar ou excluir dados em uma tabela de streaming de destino

Se o pipeline publicar tabelas no Unity Catalog, você poderá usar instruções DML (linguagem de manipulação de dados), incluindo instruções insert, update, delete e merge, para modificar as tabelas de streaming de destino criadas pelas APPLY CHANGES INTO instruções.

Nota

  • Não há suporte para instruções DML que modificam o esquema de tabela de uma tabela de streaming. Certifique-se de que suas instruções DML não tentem evoluir o esquema da tabela.
  • As instruções DML que atualizam uma tabela de streaming podem ser executadas somente em um cluster compartilhado do Catálogo Unity ou em um armazém SQL usando o Databricks Runtime 13.3 LTS e superior.
  • Como o streaming requer fontes de dados somente acréscimo, se o processamento exigir streaming de uma tabela de streaming de origem com alterações (por exemplo, por instruções DML), defina o sinalizador skipChangeCommits ao ler a tabela de streaming de origem. Quando skipChangeCommits é definido, as transações que excluem ou modificam registros na tabela de origem são ignoradas. Se o processamento não exigir uma tabela de streaming, você poderá usar uma exibição materializada (que não tem a restrição de acréscimo apenas) como tabela de destino.

Como o Delta Live Tables usa uma coluna especificada SEQUENCE BY e propaga valores de sequenciamento apropriados para as __START_AT colunas e __END_AT da tabela de destino (para SCD tipo 2), você deve garantir que as instruções DML usem valores válidos para essas colunas para manter a ordenação adequada dos registros. Consulte Como o CDC é implementado com a APPLY CHANGES API?.

Para obter mais informações sobre como usar instruções DML com tabelas de streaming, consulte Adicionar, alterar ou excluir dados em uma tabela de streaming.

O exemplo a seguir insere um registro ativo com uma sequência inicial de 5:

INSERT INTO my_streaming_table (id, name, __START_AT, __END_AT) VALUES (123, 'John Doe', 5, NULL);

Obter dados sobre registros processados por uma consulta CDC do Delta Live Tables

Nota

As métricas a seguir são capturadas apenas por APPLY CHANGES consultas, e não por APPLY CHANGES FROM SNAPSHOT consultas.

As seguintes métricas são capturadas por APPLY CHANGES consultas:

  • num_upserted_rows: O número de linhas de saída inseridas no conjunto de dados durante uma atualização.
  • num_deleted_rows: O número de linhas de saída existentes excluídas do conjunto de dados durante uma atualização.

A num_output_rows métrica, que é a saída para fluxos não CDC, não é capturada para apply changes consultas.

Quais objetos de dados são usados para o processamento CDC do Delta Live Tables?

Nota: As seguintes estruturas de dados aplicam-se apenas ao processamento, não APPLY CHANGES FROM SNAPSHOT ao APPLY CHANGES processamento.

Quando você declara a tabela de destino no metastore do Hive, duas estruturas de dados são criadas:

  • Um modo de exibição usando o nome atribuído à tabela de destino.
  • Uma tabela de suporte interna usada pela Delta Live Tables para gerenciar o processamento CDC. Esta tabela é nomeada por preceder __apply_changes_storage_ o nome da tabela de destino.

Por exemplo, se você declarar uma tabela de destino chamada dlt_cdc_target, verá uma exibição nomeada dlt_cdc_target e uma tabela nomeada __apply_changes_storage_dlt_cdc_target no metastore. A criação de uma exibição permite que o Delta Live Tables filtre as informações extras (por exemplo, lápides e versões) necessárias para lidar com dados fora de ordem. Para visualizar os dados processados, consulte a vista de destino. Como o __apply_changes_storage_ esquema da tabela pode mudar para oferecer suporte a recursos ou aprimoramentos futuros, você não deve consultar a tabela para uso em produção. Se você adicionar dados manualmente à tabela, presume-se que os registros venham antes de outras alterações porque as colunas de versão estão ausentes.

Se um pipeline for publicado no Unity Catalog, as tabelas de suporte internas ficarão inacessíveis aos usuários.