Partilhar via


ETL em Databricks SQL

Ao lidar com grandes quantidades de dados, é necessário um pipeline que possa processar apenas os registos novos e alterados, em vez de reprocessar todo o conjunto de dados. Isto chama-se ETL incremental. No Databricks SQL, pode construir pipelines ETL incrementais usando tabelas de streaming e vistas materializadas, sem escrever código procedural ou agendar atualizações manuais.

Este tutorial guia-o por um padrão comum: acompanhar as alterações do produto ao longo do tempo. Cria-se uma tabela de origem, captura-se eventos de alteração, constrói-se uma tabela de dimensões que preserva o histórico completo de cada produto e adiciona-se uma camada agregada de relatórios por cima.

A principal funcionalidade deste tutorial é AUTO CDC. Num armazém tradicional, escreveria instruções complexas MERGE INTO para conciliar a inserção, atualização e eliminação de eventos numa tabela alvo. Esta abordagem é propensa a erros, especialmente quando os acontecimentos surgem fora de ordem. AUTO CDC Trata disto por ti. Declaras a chave de negócio, a coluna de sequenciação e se queres SCD Tipo 1 (apenas o valor mais recente) ou SCD Tipo 2 (histórico completo), e o Azure Databricks aplica automaticamente a lógica de fusão correta. Para uma visão geral do CDC, veja As APIs AUTO CDC – Simplifique a captura de dados de alterações com pipelines.

Ao final deste tutorial, você terá:

  1. Criei uma tabela de origem que acompanha as alterações com o Change Data Feed.
  2. Inspecionei os dados brutos das alterações para compreender o fluxo de eventos do CDC.
  3. Usado AUTO CDC para construir uma tabela de dimensões SCD Tipo 2 a partir desses eventos.
  4. Processa os eventos de eliminação incrementalmente através do pipeline.
  5. Criei uma vista materializada que mantém um relatório agregado de forma incremental.
  6. Configurado SCHEDULE REFRESH EVERY 1 DAY para que as alterações se propaguem automaticamente ao longo da pipeline.

Requisitos

Para concluir este tutorial, você deve atender aos seguintes requisitos:

Passo 1: Configure o seu catálogo e esquema

Abre o editor SQL do Databricks e define o teu catálogo e esquema de trabalho. É necessário ter permissão para USE o catálogo e o esquema que selecionar:

USE CATALOG <your-catalog>;
USE SCHEMA <your-schema>;

Passo 2: Criar uma tabela de origem e carregar os dados

Crie uma tabela products com Use o feed de dados de alterações do Delta Lake no Azure Databricks (CDF) ativado. O CDF é uma funcionalidade Delta Lake que regista cada inserção, atualização e eliminação como um registo de alterações consultável. Isto é semelhante a um fluxo CDC de um sistema de origem transacional, exceto que as alterações são capturadas diretamente na tabela Delta em vez de num log externo. Aqui utiliza-se o CDF para gerar os eventos de alteração que o pipeline a jusante irá consumir.

  1. Crie a tabela e carregue os registos iniciais:

    CREATE OR REPLACE TABLE products (
      product_id INT,
      product_name STRING,
      category STRING,
      warehouse STRING
    )
    TBLPROPERTIES (delta.enableChangeDataFeed = true);
    
    INSERT INTO products VALUES
      (1, 'Spoon', 'Cutlery', 'Seattle'),
      (2, 'Fork', 'Cutlery', 'Portland'),
      (3, 'Knife', 'Cutlery', 'Denver'),
      (4, 'Chair', 'Furniture', 'Austin'),
      (5, 'Table', 'Furniture', 'Chicago'),
      (6, 'Lamp', 'Lighting', 'Boston'),
      (7, 'Mug', 'Kitchenware', 'Seattle'),
      (8, 'Plate', 'Kitchenware', 'Atlanta'),
      (9, 'Bowl', 'Kitchenware', 'Dallas'),
      (10, 'Glass', 'Kitchenware', 'Phoenix');
    
  2. Simule alterações a montante, incluindo novos produtos, uma mudança de armazém e uma reatribuição de categoria:

    INSERT INTO products VALUES
      (11, 'Napkin', 'Dining', 'San Francisco'),
      (12, 'Coaster', 'Dining', 'New York');
    
    UPDATE products SET warehouse = 'Los Angeles' WHERE product_id = 1;
    UPDATE products SET category = 'Dining' WHERE product_id = 2;
    

Passo 3: Consultar o feed de dados de alterações

Antes de construir o pipeline a jusante, ajuda olhar para os eventos brutos de alteração para perceber o que AUTO CDC irá processar. A table_changes() função lê o registo do CDF e devolve todas as operações capturadas juntamente com as colunas de metadados:

SELECT
  product_id, product_name, warehouse,
  _change_type, _commit_version
FROM table_changes('products', 1)
ORDER BY _commit_version, product_id;

Por exemplo, o Spoon tem três eventos: um insert (Seattle), um update_preimage (Seattle) e um update_postimage (Los Angeles).

Note que uma única alteração lógica (por exemplo, mover a Colher para outro armazém) produz múltiplos eventos: uma pré-imagem e uma pós-imagem. Num armazém tradicional, escreveria uma MERGE instrução para reconciliar todos estes eventos numa tabela de alvo, tratando inserções, atualizações e eliminações com lógica separada, garantindo que os eventos são aplicados na ordem correta. Esta é exatamente a complexidade que AUTO CDC elimina no passo seguinte.

Passo 4: Construir uma dimensão SCD Tipo 2 com AUTO CDC

Importante

AUTO CDC está em Beta. Requer Databricks Runtime 17.3 ou superior.

Uma tabela de streaming processa os dados de forma incremental. Em cada atualização, só lê as novas linhas desde a última execução, por isso não precisa de reprocessar o conjunto de dados completo. Isto torna-o ideal para fontes de grande volume ou frequentemente mutáveis.

AUTO CDC adiciona processamento de captura de dados de alterações por cima de uma tabela de streaming. Em vez de escrever uma instrução MERGE INTO que gere manualmente inserções, atualizações e eliminações, declara a chave de negócio e a coluna de sequenciação e deixa-Azure Databricks aplicar a lógica correta. AUTO CDC também processa automaticamente eventos fora de ordem, que é um problema comum quando se utiliza MERGE INTO para lidar com eventos que chegam de sistemas distribuídos ou cargas em lote com carimbos de tempo sobrepostos.

A seguinte instrução cria uma tabela SCD Tipo 2 que preserva o histórico completo de versões de cada produto. Cada versão recebe __START_AT e __END_AT carimbos temporais. A NULL em __END_AT marca a versão atual.

CREATE OR REFRESH STREAMING TABLE products_history
SCHEDULE REFRESH EVERY 1 DAY
FLOW AUTO CDC
FROM STREAM products WITH (readChangeFeed = true)
KEYS (product_id)
APPLY AS DELETE WHEN _change_type = 'delete'
SEQUENCE BY _commit_timestamp
COLUMNS * EXCEPT (_change_type, _commit_version, _commit_timestamp)
STORED AS SCD TYPE 2;
  • SCHEDULE REFRESH EVERY 1 DAY: refresca a mesa num horário diário.
  • FLOW AUTO CDC: declara isto como um fluxo CDC. O Azure Databricks aplica automaticamente as semânticas de inserção, atualização e eliminação.
  • KEYS (product_id): a chave de negócios. Eventos com a mesma chave são fundidos em linhas versionadas.
  • APPLY AS DELETE WHEN _change_type = 'delete': fecha a versão atual quando chega um evento de eliminação. Isto permite-lhe definir a condição que identifica um evento de eliminação.
  • SEQUENCE BY _commit_timestamp: estabelece a ordenação dos eventos. Lida corretamente com chegadas fora de ordem.
  • STORED AS SCD TYPE 2: conserva toda a história. AUTO CDC suporta tanto SCD Tipo 1 como SCD Tipo 2.

Consultar a tabela de dimensões:

SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;
  • Colher: duas versões. Seattle (fechado, __END_AT set) e Los Angeles (atual, __END_AT = NULL).
  • Garfo: duas versões. Categoria de talheres (fechada) e categoria de jantar (atual).
  • Guardanapo e Porta-copos: uma versão cada (recém-inseridos, __END_AT = NULL).
  • Todos os outros produtos: uma versão para cada (__END_AT = NULL).

Passo 5: Eliminações de processos através do pipeline

Agora simule dois produtos descontinuados eliminando-os da tabela de origem:

DELETE FROM products WHERE product_id = 9;
DELETE FROM products WHERE product_id = 10;

Estes eventos de eliminação são registados no registo do CDF, mas a tabela de streaming ainda não os viu. Atualize a tabela de streaming para processar os novos eventos:

REFRESH STREAMING TABLE products_history;

Consulte a tabela de dimensões para verificar se foram aplicadas as eliminações:

SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
ORDER BY product_id, __START_AT;

Bowl e Glass estão agora encerrados com o conjunto __END_AT, sendo marcados como descontinuados. Todos os outros produtos atuais mantêm-se inalterados. A tabela de streaming só processava os novos eventos de eliminação sem reprocessar as inserções e atualizações da atualização anterior.

Passo 6: Criar uma vista agregada e materializada

Agora que tem uma tabela de dimensão que se mantém atualizada com as alterações de fonte, pode adicionar uma camada de relatórios por cima.

Uma visualização materializada armazena resultados de consultas pré-calculadas como uma tabela física. Ao contrário de uma vista normal, que reexecuta a consulta cada vez que é lida, uma vista materializada preserva os resultados e recalcula apenas as linhas afetadas por alterações a montante em cada atualização. Isto torna-o bem adequado para painéis e relatórios onde o desempenho das consultas é relevante.

CREATE OR REPLACE MATERIALIZED VIEW products_by_category
SCHEDULE REFRESH EVERY 1 DAY
AS
SELECT
  category,
  COUNT(*) AS active_products
FROM products_history
WHERE __END_AT IS NULL
GROUP BY category;

SCHEDULE REFRESH EVERY 1 DAY significa que esta vista se atualiza num horário diário. Combinado com o mesmo cronograma na tabela de streaming, tens agora um pipeline de três etapas onde as alterações à tabela de origem passam pela dimensão e entram no agregado em cada ciclo de atualização. Não há atualização manual para executar.

SELECT * FROM products_by_category ORDER BY active_products DESC;

Passo 7: Verificar a cascata de ponta a ponta

Para verificar a cascata completa do pipeline, faça uma alteração na tabela de origem:

UPDATE products SET warehouse = 'Seattle' WHERE product_id = 3;

A Faca muda-se de Denver para Seattle. Esta única alteração de DML desencadeia toda a cascata do pipeline, demonstrando como as três etapas trabalham em conjunto.

  1. products regista o evento de alteração via CDF.
  2. products_history processa o evento e adiciona uma nova versão para a faca.
  3. products_by_category recalcula apenas a fila de talheres afetada.

Verificar:

SELECT product_id, product_name, warehouse, __START_AT, __END_AT
FROM products_history
WHERE product_id = 3
ORDER BY __START_AT;

SELECT * FROM products_by_category ORDER BY active_products DESC;

Limpeza

Para limpar os recursos criados por este tutorial, use o seguinte SQL:

DROP MATERIALIZED VIEW IF EXISTS products_by_category;
DROP STREAMING TABLE IF EXISTS products_history;
DROP TABLE IF EXISTS products;

Recursos adicionais