Compartilhar via


Práticas recomendadas para pipelines declarativos do Lakeflow Spark

Esta página descreve os padrões recomendados para projetar, criar e operar pipelines com o Lakeflow Spark Declarative Pipelines. Aplique essas diretrizes ao iniciar um novo pipeline ou melhorar um existente.

Escolher o tipo de conjunto de dados correto

O Lakeflow Spark Declarative Pipelines oferece três tipos de conjunto de dados: tabelas de streaming, exibições materializadas e exibições temporárias. Escolher o tipo certo para cada camada do pipeline evita custos de computação desnecessários e mantém seu código fácil de raciocinar.

As tabelas de streaming são a opção certa para a ingestão de dados e transformações de streaming de baixa latência. Cada linha de entrada é lida e processada apenas uma vez, o que as torna ideais para cargas de trabalho de acréscimo apenas, dados de alto volume e processamento acionado por eventos a partir de armazenamento em nuvem ou barramentos de mensagens.

Exibições materializadas são a escolha certa para transformações complexas e consultas analíticas. Seus resultados são previamente computados e mantidos atualizados usando a atualização incremental, portanto, as consultas em relação a eles são rápidas. Você não pode modificar diretamente os dados em uma exibição materializada – a definição de consulta controla a saída.

Exibições temporárias são exibições com escopo de pipeline que organizam sua lógica de transformação sem materializar dados no armazenamento. Use-as para etapas intermediárias que não precisam de sua própria tabela.

A tabela a seguir resume quando usar cada tipo:

Caso de uso Tipo recomendado Reason
Ingestão do armazenamento em nuvem ou de um barramento de mensagens Tabela de streaming Processa cada registro uma vez; manuseia cargas de trabalho de alto volume e de acréscimo único.
Fluxos CDC (inserções, atualizações, exclusões) Tabela de streaming Usado como destino de APPLY CHANGES INTO para ingestão CDC ordenada e deduplicada.
Agregações e junções complexas Visão materializada Atualizado incrementalmente; evita a recomputação completa em cada atualização.
Aceleração da consulta de dados do dashboard Visão materializada Os resultados pré-computados tornam as consultas mais rápidas do que em tabelas brutas.
Transformações intermediárias (sem leitores downstream) Visualização temporária Organiza a lógica do pipeline sem incorrer no custo de armazenamento.

Para obter mais informações, consulte tabelas de streaming, visões materializadas e conceitos de Pipelines Declarativos do Lakeflow Spark.

Utilize CDC declarativo em vez de MERGE imperativo

Implementar a CDC (captura de dados de alteração) com instruções SQL MERGE imperativas requer um código personalizado significativo para lidar corretamente com ordenação de eventos, eliminação de duplicação, atualizações parciais e evolução do esquema. Cada uma dessas preocupações deve ser resolvida de forma independente e o código resultante é difícil de manter e testar.

O Lakeflow Spark Declarative Pipelines fornece a APPLY CHANGES INTO instrução (SQL) e a apply_changes() função (Python), que lidam com ordenação, desduplicação, eventos fora de ordem e evolução de esquemas declarativamente. Você descreve a forma do feed de alterações e da tabela de destino — o pipeline cuida do restante. APPLY CHANGES INTO dá suporte ao SCD Tipo 1 (substituição) e SCD Tipo 2 (preservação do histórico).

Para obter mais informações, consulte Change data capture and snapshots e The AUTO CDC APIs: Simplify change data capture with pipelines.

Reforçar a qualidade dos dados com as expectativas

As expectativas são expressões SQL verdadeiras/falsas aplicadas a cada linha que passa por um conjunto de dados. Quando uma linha falha na condição, o pipeline responde de acordo com a política de violação que você configurou. As expectativas emitem métricas para o log de eventos do pipeline, independentemente da política, para que você possa acompanhar as tendências de qualidade dos dados ao longo do tempo.

Selecionar uma política de infração

Três políticas de violação estão disponíveis. Escolha aquele que corresponda à tolerância a dados incorretos:

  • aviso (padrão): os registros que não são válidos são gravados na tabela de destino e sinalizados em métricas. Use essa política quando precisar capturar todos os dados, mas quiser visibilidade dos problemas de qualidade.
  • drop: Registros inválidos são descartados antes de serem gravados. Use isso quando linhas inválidas forem esperadas e não devem se propagar downstream.
  • fail: a atualização do pipeline é interrompida no primeiro registro inválido. Use isso para dados críticos em que qualquer registro problemático indica um sério problema nas etapas anteriores.

Os exemplos a seguir mostram cada política aplicada a uma tabela de streaming:

SQL

-- Warn: write invalid records but track them in metrics
CREATE OR REFRESH STREAMING TABLE orders_raw (
  CONSTRAINT valid_order_id EXPECT (order_id IS NOT NULL)
) AS SELECT * FROM STREAM read_files("/volumes/raw/orders", format => "json");

-- Drop: discard invalid records before writing
CREATE OR REFRESH STREAMING TABLE orders_clean (
  CONSTRAINT non_negative_amount EXPECT (amount >= 0) ON VIOLATION DROP ROW
) AS SELECT * FROM STREAM(orders_raw);

-- Fail: stop the pipeline on any invalid record
CREATE OR REFRESH STREAMING TABLE orders_critical (
  CONSTRAINT required_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
) AS SELECT * FROM STREAM(orders_clean);

Python

from pyspark import pipelines as dp

# Warn: write invalid records but track them in metrics
@dp.table
@dp.expect("valid_order_id", "order_id IS NOT NULL")
def orders_raw():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "json") \
        .load("/volumes/raw/orders")

# Drop: discard invalid records before writing
@dp.table
@dp.expect_or_drop("non_negative_amount", "amount >= 0")
def orders_clean():
    return spark.readStream.table("orders_raw")

# Fail: stop the pipeline on any invalid record
@dp.table
@dp.expect_or_fail("required_customer_id", "customer_id IS NOT NULL")
def orders_critical():
    return spark.readStream.table("orders_clean")

Colocar registros inválidos em quarentena

Quando você quiser preservar os registros descartados para investigação, ao invés de descartá-los silenciosamente, use um padrão de quarentena. Encaminhe linhas que falham na validação para uma tabela de streaming separada usando dois fluxos: um que descarta linhas inválidas da tabela principal e um segundo que grava apenas as linhas inválidas em uma tabela de quarentena. Isso permite que você investigue, corrija e reprocesse dados incorretos sem contaminar seu conjunto de dados limpo.

Para obter um exemplo detalhado do padrão de quarentena, consulte recomendações de expectativa e padrões avançados.

Para obter mais informações sobre expectativas, consulte Gerenciar a qualidade dos dados com as expectativas do pipeline.

Parametrizar seus pipelines

Os pipelines têm configurações padrão de catálogo e esquema, portanto, o código que lê e grava no mesmo catálogo e esquema funciona em ambientes sem parâmetros. No entanto, se seu pipeline de dados precisar referenciar um segundo catálogo ou esquema, como ler de um catálogo de origem compartilhado que varia entre ambientes de desenvolvimento e produção, evite codificar esses nomes diretamente no código-fonte. Em vez disso, defina-os como parâmetros de configuração de pipeline (pares chave-valor definidos nas configurações de pipeline) e referencie-os em seu código. Isso permite que uma única base de código seja executada corretamente entre ambientes trocando os valores de parâmetro.

SQL

CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id, COUNT(txn_id) AS txn_count, SUM(amount) AS total_amount
FROM ${source_catalog}.sales.transactions
GROUP BY account_id;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import count, sum

@dp.materialized_view
def transaction_summary():
    source_catalog = spark.conf.get("source_catalog")
    return spark.read.table(f"{source_catalog}.sales.transactions") \
        .groupBy("account_id") \
        .agg(
            count("txn_id").alias("txn_count"),
            sum("amount").alias("total_amount")
        )

Para obter mais informações, consulte Como usar parâmetros com pipelines.

Escolha o modo de pipeline correto para cada ambiente

Modos de atualização de desenvolvimento e produção

Os pipelines são executados no modo de desenvolvimento ou no modo de produção. Escolha o modo que corresponde à sua meta.

No modo de desenvolvimento, o pipeline reutiliza um cluster de longa execução entre atualizações e não tenta operações novamente em caso de erro. Isso acelera o ciclo de iteração quando você está criando e testando o código do pipeline porque obtém detalhes de erro imediatamente sem esperar pelas reinicializações do cluster.

No modo de produção, o cluster é desligado imediatamente após a conclusão de cada atualização, o que reduz os custos de computação. O pipeline também aplica tentativas cada vez mais frequentes, incluindo reinicializações de cluster, para lidar automaticamente com falhas transitórias de infraestrutura. Utilize o modo de produção para todas as execuções de pipelines agendadas.

Acionado versus modo de pipeline contínuo

O modo disparado processa todos os dados disponíveis e, em seguida, é interrompido. É a escolha certa para a grande maioria dos pipelines: aqueles que operam conforme um cronograma (por hora, diariamente ou sob demanda) e não exigem atualização de dados em menos de um minuto.

O modo contínuo mantém o cluster em execução e processa novos dados conforme ele chega. É apropriado somente quando seu caso de uso requer latência no intervalo de segundos a minutos. Como o modo contínuo requer um cluster sempre ativo, ele é significativamente mais caro do que o modo acionado.

Para obter mais informações, consulte Modo de pipeline acionado vs. contínuo e Configurar pipelines.

Usar clustering líquido para disposição de dados

O agrupamento dinâmico substitui o particionamento estático e o uso de ZORDER para otimizar o layout de dados em tabelas Delta. Ao contrário do particionamento, que exige que você escolha uma coluna de partição antecipadamente e possa causar distorção de dados quando os valores são distribuídos de forma desigual, o clustering líquido é autoajustado, resistente a distorções e incremental – somente os dados que precisam de reorganização são reescritos em cada execução.

Altere as colunas de clustering a qualquer momento sem reescrever a tabela completa à medida que os padrões de consulta evoluem.

Defina colunas de agrupamento na definição da tabela de streaming:

SQL

CREATE OR REFRESH STREAMING TABLE events
CLUSTER BY (event_date, region)
AS SELECT * FROM STREAM read_files("/volumes/raw/events", format => "parquet");

Python

from pyspark import pipelines as dp

@dp.table(cluster_by=["event_date", "region"])
def events():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "parquet") \
        .load("/volumes/raw/events")

Se você não tiver certeza de quais colunas agrupar por, use CLUSTER BY AUTO para permitir que o Databricks selecione automaticamente as colunas de agrupamento ideais baseado no seu fluxo de consultas.

Para obter mais informações, consulte As tabelas de streaming e use o clustering líquido para tabelas.

Gerenciar pipelines com CI/CD e Pacotes de Automação Declarativa

Gerencie o controle de versão do código-fonte do pipeline e utilize Pacotes Declarativos de Automação para administrar implantações em diferentes ambientes.

Para obter mais informações, consulte Criar um pipeline controlado pela origem, Converter um pipeline em um projeto de pacote e usar parâmetros com pipelines.

Armazenar código de fluxo de trabalho no sistema de controle de versão

Armazene todos os arquivos de origem do pipeline (Python e SQL) junto com a configuração do pacote em um repositório Git. O controle de versão do projeto completo oferece um histórico completo de alterações, facilita a colaboração e permite que você valide as alterações em um ambiente de desenvolvimento antes de promovê-las à produção.

O Databricks recomenda Pacotes de Automação Declarativa para gerenciar esse fluxo de trabalho. Um pacote define sua configuração de pipeline no YAML junto com o código-fonte e a databricks bundle CLI permite validar, implantar e executar pipelines do terminal ou de um sistema de CI/CD.

Usar destinos de pacote para isolamento de ambiente

Os pacotes permitem vários alvos (por exemplo, dev, staging, prod), cada um com seu próprio conjunto de sobreposições para nomes de catálogo, políticas de cluster, endereços de notificação e outras configurações. Combine os alvos de pacote com parâmetros de pipeline para injetar os valores corretos específicos do ambiente no momento da implantação, mantendo seu código-fonte livre de constantes de configuração de ambiente.

Um fluxo de trabalho típico tem esta aparência:

  1. Um desenvolvedor trabalha em um branch de funcionalidade, implantando em um pipeline de desenvolvimento individual em um catálogo de desenvolvimento do desenvolvedor.
  2. Ao fazer merge ao branch principal, um sistema de CI executa databricks bundle validate e databricks bundle deploy --target staging para validar e implantar o pipeline em um ambiente de homologação.
  3. Após a conclusão dos testes, o sistema de Integração Contínua (CI) é implantado em produção com databricks bundle deploy --target prod.

Práticas recomendadas de streaming

Use esses padrões para gerenciar o estado, controlar dados atrasados e manter os pipelines de streaming confiáveis.

Para obter mais informações, consulte Otimizar o processamento com estado dinâmico com marcas d'água, recuperar um pipeline de uma falha de ponto de verificação em streaming e Preencher dados históricos com pipelines.

Usar marcas d'água para operações com estado

As marcas d'água delimitam o estado que o pipeline mantém na memória durante operações de streaming stateful, como agregações em janelas e desduplicação. Sem uma marca d'água, o estado cresce sem limites à medida que o pipeline acumula dados para cada chave possível, eventualmente causando erros de falta de memória em pipelines de longa execução.

Uma marca temporal especifica uma coluna de carimbo de data/hora e um limite de tolerância para dados recebidos com atraso. Os registros que chegam depois que o limite é passado são descartados. Escolha um limite que balancee sua tolerância a dados atrasados em relação ao custo de memória de manter esse estado aberto.

O exemplo a seguir calcula uma agregação de janela deslizante de um minuto com uma marca d'água de três minutos.

SQL

CREATE OR REFRESH STREAMING TABLE event_counts AS
SELECT window(event_time, '1 minute') AS time_window, region, COUNT(*) AS cnt
FROM STREAM(events_raw)
  WATERMARK event_time DELAY OF INTERVAL 3 MINUTES
GROUP BY time_window, region;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import window

@dp.table
def event_counts():
    return (
        spark.readStream.table("events_raw")
            .withWatermark("event_time", "3 minutes")
            .groupBy(window("event_time", "1 minute"), "region")
            .count()
    )

Observação

Para garantir que as agregações sejam processadas incrementalmente em vez de serem plenamente recomputadas em cada atualização, você deve definir uma marca d'água.

Entender o estado de streaming e a atualização completa

O estado de streaming é incremental: o pipeline constrói e mantém o estado através das atualizações, em vez de recalcular do zero a cada vez. Isso é o que torna o streaming stateful eficiente, mas também significa que se você alterar a lógica de uma consulta com estado (por exemplo, modificar um limite de watermark ou alterar colunas de agregação), o estado existente não será mais compatível com a nova lógica. Nesse caso, você deve executar uma atualização completa para reprocessar todos os dados históricos com a nova lógica e recompilar o estado do zero.

Uma atualização completa também poderá levar à perda de dados se a origem não reter dados históricos. Por exemplo, uma fonte kafka com um curto período de retenção pode ter apenas os últimos minutos de dados disponíveis no momento da atualização, resultando em uma tabela que contém muito menos dados do que antes. Planeje alterações de lógica de consulta com estado cuidadosamente, especialmente para fluxos de alto volume em que uma atualização completa é cara ou em que a fonte tem retenção de dados limitada. O uso da arquitetura de medalhão ajuda ao criar tabelas de bronze com transformação mínima e permite que as tabelas prata ou ouro sejam recalculadas a partir das tabelas de bronze mantendo todo o histórico de dados.

Junções entre fluxos

As junções entre fluxos de dados exigem uma marca d'água em ambos os lados do join e uma condição de junção com limite de tempo. O intervalo de tempo na condição de junção informa ao mecanismo de streaming quando não há mais correspondências possíveis, permitindo que ele remova o estado que não pode mais ser correspondido. Se você omitir os marcadores ou a condição temporal, o estado crescerá sem limites.

O exemplo a seguir une eventos de impressão de anúncios com eventos de clique, exigindo que o clique ocorra dentro de três minutos após a impressão:

SQL

CREATE OR REFRESH STREAMING TABLE impression_clicks AS
SELECT imp.ad_id, imp.impression_time, clk.click_time
FROM STREAM(ad_impressions)
    WATERMARK impression_time DELAY OF INTERVAL 3 MINUTES AS imp
JOIN STREAM(user_clicks)
    WATERMARK click_time DELAY OF INTERVAL 3 MINUTES AS clk
ON imp.ad_id = clk.ad_id
  AND clk.click_time BETWEEN imp.impression_time
    AND imp.impression_time + INTERVAL 3 MINUTES;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import expr

dp.create_streaming_table("impression_clicks")

@dp.append_flow(target="impression_clicks")
def join_impressions_and_clicks():
    impressions = spark.readStream.table("ad_impressions") \
        .withWatermark("impression_time", "3 minutes")
    clicks = spark.readStream.table("user_clicks") \
        .withWatermark("click_time", "3 minutes")
    return impressions.alias("imp").join(
        clicks.alias("clk"),
        expr("""
            imp.ad_id = clk.ad_id AND
            clk.click_time BETWEEN imp.impression_time AND imp.impression_time + INTERVAL 3 MINUTES
        """),
        "leftOuter"
    )

Quando você combina um fluxo com uma tabela estática (uma junção do tipo instantâneo), o instantâneo da tabela estática é atualizado novamente no início de cada micro-lote. Isso significa que os registros de dimensões que chegam tardiamente não são aplicados retroativamente a fatos que já foram processados. Se for necessária a aplicação retroativa, use uma visão materializada ou reestruture o pipeline.

Otimizar o desempenho do pipeline

Aplique essas técnicas para reduzir os custos de computação e acelerar as atualizações de pipeline.

Para obter mais informações, consulte Exibições materializadas e Otimização do processamento de estado com marcas d'água.

Evitar arquivos pequenos

Acionar um pipeline frequentemente em uma fonte de baixa taxa de dados escreve um grande número de arquivos pequenos no armazenamento em nuvem. Arquivos pequenos degradam o desempenho de leitura porque cada arquivo requer uma pesquisa de metadados separada e uma viagem de ida e volta de E/S, e as APIs de armazenamento em nuvem limitam as operações de listagem em escala. Para evitar isso, escolha um intervalo de gatilho que corresponda ao volume de dados: execute pipelines disparados em um agendamento que permita que uma quantidade significativa de dados se acumule entre atualizações, em vez de continuamente.

Manipular distorção de dados

A distorção de dados ocorre quando os valores em uma chave join ou groupBy são distribuídos de forma desigual entre partições, fazendo com que um pequeno número de tarefas processe a maioria dos dados. Isso cria hotspots que aumentam o tempo de atualização de ponta a ponta. Use o clustering líquido para solucionar distorção em tabelas armazenadas. Para distorção que ocorre durante a computação em voo, salte chaves altamente distorcidas adicionando um sufixo de bucket aleatório antes de agrupar e agregar em dois estágios.

Para obter mais informações, consulte Utilizar agrupamento líquido para a disposição dos dados.

Usar atualização incremental para exibições materializadas

Quando você usa uma visualização materializada para uma agregação grande, o Lakeflow Spark Declarative Pipelines tenta atualizá-la de forma incremental, processando apenas as modificações ascendentes desde a última atualização, em vez de recomputar o conjunto completo de resultados. A atualização incremental é significativamente mais barata do que reexecutar a consulta desde o início em cada gatilho de pipeline. Para maximizar a chance de que uma exibição materializada possa ser atualizada incrementalmente, escreva consultas de agregação simples e determinísticas e evite construções que impeçam o processamento incremental, como funções não determinísticas.

Confira Atualização incremental para exibições materializadas.

Otimizar junções

Para junções em que um lado é uma pequena tabela de dimensões, adicione uma dica de difusão para instruir o Spark a transmitir a tabela menor para todos os executores em vez de executar uma junção aleatória:

SQL

CREATE OR REFRESH MATERIALIZED VIEW enriched_orders AS
SELECT o.*, /*+ BROADCAST(p) */ p.product_name, p.category
FROM orders o
JOIN products p ON o.product_id = p.product_id;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast

@dp.materialized_view
def enriched_orders():
    orders = spark.read.table("orders")
    products = spark.read.table("products")
    return orders.join(broadcast(products), "product_id")

Para junções por proximidade em séries temporais (por exemplo, encontrar o evento mais próximo dentro de um intervalo de tempo), use uma condição de junção de intervalo e verifique se ambos os lados têm uma marca d'água se ingressarem em fluxos, ou considere pré-agrupamento de eventos em intervalos de tempo antes de ingressar.

Monitor os seus pipelines

O log de eventos do pipeline é o principal elemento de observabilidade nos Pipelines Declarativos do Lakeflow Spark. Cada execução de um pipeline grava registros estruturados no log de eventos, abrangendo o progresso da execução, os resultados das verificações de qualidade dos dados, a proveniência dos dados e os detalhes do erro. O log de eventos é uma tabela Delta que você pode consultar diretamente.

Para consultar o log de eventos sem conhecer o caminho de armazenamento subjacente, use a event_log() função com valor de tabela em um cluster compartilhado ou sql warehouse:

SELECT * FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC
LIMIT 100;

Crie painéis de qualidade de dados consultando o log de eventos para as métricas de expectativa. A details coluna contém uma estrutura JSON aninhada com contagens de aprovação/falha para cada restrição, que você pode usar para acompanhar tendências de qualidade ao longo do tempo e alertar sobre regressões.

Para alertas controlados por eventos, use ganchos de evento para disparar webhooks personalizados ou serviços de notificação (como Slack ou PagerDuty) quando um pipeline falha ou quando um limite de qualidade de dados é violado. Ganchos de evento são funções do Python que são executadas em resposta a eventos de pipeline.

Para obter mais informações, consulte Monitoramento de pipelines, Log de eventos de pipelines e Definir monitoramento personalizado para pipelines com ganchos de eventos.

Usar computação sem servidor

O Databricks recomenda a computação sem servidor para novos pipelines. Com o modelo sem servidor, não há nenhuma configuração manual de cluster — o Databricks gerencia a infraestrutura automaticamente. Os pipelines sem servidor usam dimensionamento automático aprimorado que pode dimensionar horizontalmente (mais executores) e verticalmente (tamanho maior do executor) em resposta às demandas de carga de trabalho. Os pipelines sem servidor sempre usam o Catálogo do Unity, portanto, a governança e o acompanhamento de linhagem são integrados por padrão.

Para obter mais informações, consulte Configurar um pipeline sem servidor.

Organizar pipelines com a arquitetura de medalhão

A arquitetura do medalhão organiza dados em três camadas lógicas – bronze, prata e ouro – cada uma com uma finalidade distinta. Mapear tipos de conjunto de dados do Lakeflow Spark Declarative Pipelines para a camada certa mantém as responsabilidades de cada camada claras e torna os pipelines mais fáceis de manter.

  • Bronze: Use tabelas de streaming em tempo real para ingerir dados brutos do armazenamento em nuvem, barramentos de mensagens ou fontes CDC. As tabelas bronze preservam os dados brutos de origem com transformação mínima, possibilitando que camadas de prata ou ouro sejam reprocessadas da origem na camada bronze se os requisitos forem alterados.
  • Prata: Use tabelas de streaming para transformações incrementais no nível de linha (filtragem, limpeza e análise sintática). Use visões materializadas quando a lógica da camada silver envolve junções de enriquecimento em tabelas de dimensão ou agregações complexas que se beneficiam da atualização incremental.
  • Ouro: use visões materializadas para pré-computar agregações, métricas e resumos servidos a dashboards, ferramentas de relatórios e consumidores a jusante.

Ingestão separada (bronze) e transformação (prata e ouro) em DAGs de pipeline distintos sempre que possível. Desacoplar as camadas permite agendar, monitorar e diagnosticar cada camada independentemente, e uma falha em um pipeline de transformação não impede que novos dados cheguem ao nível bronze.

Para obter mais informações, consulte Tabelas de streaming e exibições materializadas.