Partilhar via


Boas práticas para Pipelines Declarativos do Lakeflow Spark

Esta página descreve padrões recomendados para o desenho, construção e operação de oleodutos com Lakeflow Spark Declarative Pipelines. Aplique estas orientações ao iniciar um novo gasoduto ou melhorar um existente.

Escolha o tipo de conjunto de dados certo

Os Lakeflow Spark Declarative Pipelines oferecem três tipos de conjuntos de dados: tabelas de streaming, visualizações materializadas e vistas temporárias. Escolher o tipo certo para cada camada do seu pipeline evita custos de computação desnecessários e mantém o seu código fácil de raciocinar.

As tabelas de streaming são a escolha certa para ingestão de dados e transformações de streaming de baixa latência. Cada linha de entrada é lida e processada apenas uma vez, o que as torna ideais para cargas de trabalho de anexação apenas, altos volumes de dados e processamento orientado por eventos a partir de armazenamento na cloud ou buses de mensagens.

As vistas materializadas são a escolha certa para transformações complexas e consultas analíticas. Os seus resultados são pré-calculados e mantidos atualizados através de atualização incremental, pelo que as consultas contra eles são rápidas. Não pode modificar diretamente os dados numa vista materializada — a definição da consulta controla a saída.

Vistas temporárias são vistas com escopo de pipeline que organizam a sua lógica de transformação sem materializar dados em armazenamento. Use-os para passos intermediários que não precisam de uma tabela própria.

A tabela seguinte resume quando usar cada tipo:

Caso de utilização Tipo recomendado Justificação
Ingestão a partir de armazenamento em nuvem ou de um barramento de mensagens Tabela de transmissão Processa cada registo uma vez; Trata de trabalhos de grande volume e apenas de acréscimos.
Transmissões CDC (inserções, atualizações, eliminações) Tabela de transmissão Usado como alvo para APPLY CHANGES INTO a ingestão ordenada e desduplicada do CDC.
Agregações e junções complexas Visão materializada Renovado gradualmente; Evita a recomputação total a cada atualização.
Aceleração de consultas de dashboard Visão materializada Resultados pré-computados tornam as consultas mais rápidas do que contra tabelas brutas.
Transformações intermédias (sem leitores downstream) Vista temporária Organiza a lógica do pipeline sem incorrer em custos de armazenamento.

Para mais informações, consulte Tabelas de Streaming, Vistas Materializadas e conceitos relacionados a Pipelines Declarativos do Lakeflow Spark.

Use CDC declarativo em vez de merge imperativo

Implementar a captura de dados de alterações (CDC) com instruções SQL MERGE imperativas requer código personalizado significativo para gerir corretamente a ordenação de eventos, a deduplicação, atualizações parciais e a evolução do esquema. Cada uma destas preocupações deve ser resolvida de forma independente, e o código resultante é difícil de manter e testar.

Os "Lakeflow Spark Declarative Pipelines" fornecem a instrução APPLY CHANGES INTO (SQL) e a função apply_changes() (Python), que tratam de forma declarativa da ordenação, deduplicação, eventos fora de ordem e evolução do esquema. Descreve a forma do feed de alterações e da tabela alvo — o pipeline trata do resto. APPLY CHANGES INTO suporta tanto SCD Tipo 1 (sobrescrição) como SCD Tipo 2 (preservação de histórico).

Para mais informações, consulte Captura de alterações e snapshots de dados e Os APIs AUTO CDC: Simplifique a captura de alterações de dados com pipelines.

Impor a qualidade dos dados com expectativas

As expectativas são expressões SQL verdadeiras/falsas aplicadas a cada linha que passa por um conjunto de dados. Quando uma linha falha na condição, o pipeline responde de acordo com a política de violação que configuraste. As expectativas emitem métricas para o log de eventos do pipeline independentemente da política, permitindo que se acompanhem as tendências de qualidade dos dados ao longo do tempo.

Escolha uma política de infrações

Existem três políticas de infrações disponíveis. Escolhe aquele que corresponde à tua tolerância a dados errados:

  • aviso (default): Registos que não são válidos são escritos na tabela alvo e assinalados em métricas. Use esta política quando precisar de recolher todos os dados mas quiser visibilidade sobre questões de qualidade.
  • drop: Registos que não são válidos são descartados antes de serem escritos. Use isto quando se esperam linhas defeituosas e não devem propagar-se a jusante.
  • fail: A atualização do pipeline interrompe no primeiro registo inválido. Utilize isto para dados críticos onde qualquer registo incorreto indica um problema sério na origem.

Os exemplos seguintes mostram cada política aplicada a uma tabela de streaming:

SQL

-- Warn: write invalid records but track them in metrics
CREATE OR REFRESH STREAMING TABLE orders_raw (
  CONSTRAINT valid_order_id EXPECT (order_id IS NOT NULL)
) AS SELECT * FROM STREAM read_files("/volumes/raw/orders", format => "json");

-- Drop: discard invalid records before writing
CREATE OR REFRESH STREAMING TABLE orders_clean (
  CONSTRAINT non_negative_amount EXPECT (amount >= 0) ON VIOLATION DROP ROW
) AS SELECT * FROM STREAM(orders_raw);

-- Fail: stop the pipeline on any invalid record
CREATE OR REFRESH STREAMING TABLE orders_critical (
  CONSTRAINT required_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
) AS SELECT * FROM STREAM(orders_clean);

Python

from pyspark import pipelines as dp

# Warn: write invalid records but track them in metrics
@dp.table
@dp.expect("valid_order_id", "order_id IS NOT NULL")
def orders_raw():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "json") \
        .load("/volumes/raw/orders")

# Drop: discard invalid records before writing
@dp.table
@dp.expect_or_drop("non_negative_amount", "amount >= 0")
def orders_clean():
    return spark.readStream.table("orders_raw")

# Fail: stop the pipeline on any invalid record
@dp.table
@dp.expect_or_fail("required_customer_id", "customer_id IS NOT NULL")
def orders_critical():
    return spark.readStream.table("orders_clean")

Quarentena de registos inválidos

Quando quiser preservar registos abandonados para investigação em vez de os descartar silenciosamente, use um padrão de quarentena. Encaminhe as linhas que falham na validação para uma tabela de streaming separada usando dois fluxos: um que elimina linhas inválidas da tabela principal e outro que escreve apenas as linhas inválidas numa tabela de quarentena. Isto permite-lhe investigar, corrigir e reprocessar dados errados sem contaminar o seu conjunto de dados limpo.

Para um exemplo detalhado do padrão de quarentena, veja Recomendações de expectativas e padrões avançados.

Para mais informações sobre expectativas, consulte Gerir a qualidade dos dados com expectativas do pipeline.

Parametrize os seus pipelines

Os pipelines têm definições padrão de catálogo e esquema, por isso o código que lê e escreve dentro do mesmo catálogo e esquema funciona entre ambientes sem quaisquer parâmetros. No entanto, se o seu pipeline precisar de referenciar um segundo catálogo ou esquema — por exemplo, ler de um catálogo de fonte partilhado que difere entre desenvolvimento e produção — evite codificar esses nomes diretamente no seu código-fonte. Em vez disso, defina-os como parâmetros de configuração do pipeline (pares-chave-valor definidos nas definições do pipeline) e referencia-os no seu código. Isto permite que uma única base de código corra corretamente entre ambientes ao trocar os valores dos parâmetros.

SQL

CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id, COUNT(txn_id) AS txn_count, SUM(amount) AS total_amount
FROM ${source_catalog}.sales.transactions
GROUP BY account_id;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import count, sum

@dp.materialized_view
def transaction_summary():
    source_catalog = spark.conf.get("source_catalog")
    return spark.read.table(f"{source_catalog}.sales.transactions") \
        .groupBy("account_id") \
        .agg(
            count("txn_id").alias("txn_count"),
            sum("amount").alias("total_amount")
        )

Para mais informações, veja Usar parâmetros com pipelines.

Escolha o modo de pipeline certo para cada ambiente

Modos de atualização de desenvolvimento e produção

Os pipelines correm em modo de desenvolvimento ou de atualização em produção . Escolhe o modo que corresponda ao teu objetivo.

Em modo de desenvolvimento, o pipeline reutiliza um cluster de longa duração entre atualizações e não faz novas tentativas em caso de erros. Isto acelera o ciclo de iteração quando estás a criar e testar código do pipeline porque recebes detalhes de erro imediatamente sem esperar pelos reinícios do cluster.

Em modo de produção, o cluster desliga-se rapidamente após a conclusão de cada atualização, o que reduz os custos de computação. O pipeline também aplica tentativas progressivas, incluindo a reinicialização automática dos clusters, para lidar com falhas transitórias da infraestrutura. Use o modo de produção para todas as execuções de pipeline agendadas.

Modo de pipeline desencadeado vs. contínuo

O modo acionado processa todos os dados disponíveis e depois para. É a escolha certa para a grande maioria dos pipelines: aqueles que funcionam com um horário (horário, diário ou sob demanda) e que não exigem atualização de dados abaixo de um minuto.

O modo contínuo mantém o cluster a funcionar e processa novos dados à medida que chegam. Só é apropriado quando o seu caso de uso exige latência na ordem dos segundos aos minutos. Como o modo contínuo requer um cluster sempre ligado, é significativamente mais caro do que o modo acionado.

Para mais informações, consulte Modo de tubulação desencadeado vs. contínuo e Configurando Tubulações.

Use agrupamento líquido para o layout de dados

A clusterização líquida substitui a partição estática e ZORDER para otimizar o layout dos dados em tabelas Delta. Ao contrário do particionamento, que exige escolher uma coluna de partição desde o início e pode causar desfasamento de dados quando os valores estão distribuídos de forma desigual, o agrupamento líquido é auto-ajustável, resistente ao desfasamento e incremental — apenas os dados que precisam de reorganização são reescritos em cada execução.

Mude as colunas de agrupamento a qualquer momento sem reescrever a tabela completa à medida que os padrões de consulta evoluem.

Defina colunas de agrupamento na definição da sua tabela de streaming.

SQL

CREATE OR REFRESH STREAMING TABLE events
CLUSTER BY (event_date, region)
AS SELECT * FROM STREAM read_files("/volumes/raw/events", format => "parquet");

Python

from pyspark import pipelines as dp

@dp.table(cluster_by=["event_date", "region"])
def events():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "parquet") \
        .load("/volumes/raw/events")

Se não tiveres a certeza de que colunas agrupar, usa CLUSTER BY AUTO para deixar que o Databricks selecione automaticamente as colunas de clustering ótimas com base na carga de trabalho da tua consulta.

Para mais informações, consulte Tabelas de streaming e Usar agregação dinâmica para tabelas.

Gerencie pipelines com CI/CD e Pacotes de Automação Declarativa

Controle as versões do código-fonte do teu pipeline e usa Pacotes de Automação Declarativa para gerir implementações em vários ambientes.

Para mais informações, consulte Criar um pipeline controlado por código-fonte, Converter um pipeline num projeto bundle e Usar parâmetros com pipelines.

Armazenar código do pipeline num sistema de controlo de versões

Armazena todos os ficheiros fonte do pipeline (Python e SQL) juntamente com a configuração do teu bundle num repositório Git. O controlo de versões do projeto completo dá-lhe um histórico completo das alterações, facilita a colaboração e permite validar alterações num ambiente de desenvolvimento antes de as promover para produção.

A Databricks recomenda os Pacotes de Automação Declarativa para gerir este fluxo de trabalho. Um bundle define a configuração do teu pipeline em YAML juntamente com o código-fonte, e a databricks bundle CLI permite-te validar, implementar e executar pipelines a partir do teu terminal ou de um sistema CI/CD.

Use alvos de pacote para isolamento de ambiente

Os bundles permitem múltiplos alvos (por exemplo, dev, staging, prod), cada um com o seu próprio conjunto de sobreposições para nomes de catálogo, políticas de cluster, endereços de notificação e outras definições. Combine os alvos de agrupamento com os parâmetros do pipeline para injetar os valores adequados para o ambiente no momento da implementação, mantendo o seu código-fonte livre de constantes de ambiente.

Um fluxo de trabalho típico é o seguinte:

  1. Um programador trabalha num ramo de funcionalidades, implementando para um pipeline de desenvolvimento pessoal num catálogo de desenvolvimento.
  2. Na integração para o ramo principal, um sistema de CI executa databricks bundle validate e databricks bundle deploy --target staging para validar e implementar o pipeline num ambiente de pré-produção.
  3. Após a conclusão dos testes, o sistema CI é implantado em produção com databricks bundle deploy --target prod.

Melhores práticas de streaming

Use estes padrões para gerir o estado, controlar dados atrasados e manter pipelines de streaming fiáveis.

Para mais informações, consulte Otimizar processamento com estado e marcas de água, Recuperar um pipeline a partir de falha de checkpoint de streaming, e Preenchimento de dados históricos com pipelines.

Use marcas de água para operações com estado

Marcas de água limitam o estado que o pipeline mantém na memória durante operações de streaming com estado, como agregações em janelas e deduplicação. Sem uma marca de água, o estado expande-se sem limites à medida que o pipeline acumula dados para todas as chaves possíveis, eventualmente levando a erros de falta de memória em pipelines que operam por longos períodos.

Um marcador especifica uma coluna de carimbo temporal e um limiar de tolerância para dados atrasados. Os registos que chegam depois de o limiar ter sido ultrapassado são eliminados. Escolhe um limiar que equilibre a tua tolerância a dados atrasados com o custo de memória de manter esse estado aberto.

O exemplo seguinte calcula uma agregação de janela de um minuto usando uma janela deslizante com uma marca temporal de três minutos:

SQL

CREATE OR REFRESH STREAMING TABLE event_counts AS
SELECT window(event_time, '1 minute') AS time_window, region, COUNT(*) AS cnt
FROM STREAM(events_raw)
  WATERMARK event_time DELAY OF INTERVAL 3 MINUTES
GROUP BY time_window, region;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import window

@dp.table
def event_counts():
    return (
        spark.readStream.table("events_raw")
            .withWatermark("event_time", "3 minutes")
            .groupBy(window("event_time", "1 minute"), "region")
            .count()
    )

Observação

Para garantir que as agregações são processadas de forma incremental em vez de totalmente recalculadas em cada atualização, é necessário definir uma marca de referência.

Compreender o estado do streaming e a atualização completa

O estado de streaming é incremental: o pipeline constrói e mantém o estado ao longo das atualizações, em vez de recalcular do zero a cada vez. Isto é o que torna o streaming stateful eficiente, mas também significa que, se mudar a lógica de uma consulta stateful (por exemplo, modificar um limiar de watermark ou alterar colunas de agregação), o estado existente deixa de ser compatível com a nova lógica. Neste caso, deve realizar uma atualização completa para reprocessar todos os dados históricos com a nova lógica e reconstruir o estado do zero.

Uma atualização completa também pode levar à perda de dados se a fonte não retiver dados históricos. Por exemplo, uma fonte Kafka com um período de retenção curto pode ter apenas os últimos minutos de dados disponíveis no momento da atualização, resultando numa tabela que contém muito menos dados do que antes. Planeie cuidadosamente alterações na lógica de consulta com estado, especialmente para fluxos de alto volume, onde uma atualização completa é dispendiosa ou onde a fonte tem retenção limitada de dados. A utilização da medallion architecture facilita ao criar tabelas de bronze com transformações mínimas e permite que as tabelas de prata ou ouro sejam recalculadas a partir das tabelas de bronze com todo o histórico.

Stream-stream junta-se

As junções stream-stream requerem uma marca de água em ambos os lados da junção e uma condição de junção com limite temporal. O intervalo de tempo na condição de junção indica ao motor de stream quando não são possíveis mais correspondências, permitindo-lhe remover o estado que já não pode ser correspondido. Se omitir as marcas de água ou a condição temporal, o estado cresce sem limites.

O exemplo seguinte junta eventos de impressão publicitária com eventos de cliques, exigindo que o clique ocorra dentro de três minutos após a impressão:

SQL

CREATE OR REFRESH STREAMING TABLE impression_clicks AS
SELECT imp.ad_id, imp.impression_time, clk.click_time
FROM STREAM(ad_impressions)
    WATERMARK impression_time DELAY OF INTERVAL 3 MINUTES AS imp
JOIN STREAM(user_clicks)
    WATERMARK click_time DELAY OF INTERVAL 3 MINUTES AS clk
ON imp.ad_id = clk.ad_id
  AND clk.click_time BETWEEN imp.impression_time
    AND imp.impression_time + INTERVAL 3 MINUTES;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import expr

dp.create_streaming_table("impression_clicks")

@dp.append_flow(target="impression_clicks")
def join_impressions_and_clicks():
    impressions = spark.readStream.table("ad_impressions") \
        .withWatermark("impression_time", "3 minutes")
    clicks = spark.readStream.table("user_clicks") \
        .withWatermark("click_time", "3 minutes")
    return impressions.alias("imp").join(
        clicks.alias("clk"),
        expr("""
            imp.ad_id = clk.ad_id AND
            clk.click_time BETWEEN imp.impression_time AND imp.impression_time + INTERVAL 3 MINUTES
        """),
        "leftOuter"
    )

Quando associas um fluxo a uma tabela estática (uma imagem instantânea), a imagem instantânea da tabela estática é atualizada no início de cada microbatelada. Isto significa que os registos de dimensão que chegam tarde não são aplicados retroativamente a factos que já foram processados. Se for necessária aplicação retroativa, utilize uma vista materializada ou reestruture a pipeline.

Otimizar o desempenho do pipeline

Aplicar estas técnicas para reduzir os custos de computação e acelerar as atualizações do pipeline.

Para mais informações, veja Vistas materializadas e Otimizar processamento com estado com marcas de água.

Evite ficheiros pequenos

Desencadear um pipeline com demasiada frequência numa fonte de baixo volume grava um grande número de ficheiros pequenos para armazenamento na cloud. Ficheiros pequenos degradam o desempenho de leitura porque cada ficheiro requer uma pesquisa de metadados e uma viagem de ida e volta separadas, e as APIs de armazenamento na cloud limitam as operações de listagem em escala. Para evitar isto, escolha um intervalo de gatilho que corresponda ao seu volume de dados: execute pipelines desencadeados num calendário que permita acumular uma quantidade significativa de dados entre atualizações, em vez de continuamente.

Lidar com desvio de dados

O desvio de dados ocorre quando os valores numa chave join ou groupBy estão distribuídos de forma desigual entre partições, fazendo com que um pequeno número de tarefas processe a maioria dos dados. Isto cria hotspots que aumentam o tempo de atualização de ponta a ponta. Utilizar técnicas de clustering líquido para tratar o desvio nas tabelas armazenadas. Para o desvio que ocorre durante computação em voo, salta as chaves altamente enviesadas adicionando um sufixo de compartimento aleatório antes de agrupar e agregar num processo de duas etapas.

Para mais informações, veja Usar agrupamento de líquidos para layout de dados.

Utilize a atualização incremental para visualizações materializadas

Quando se utiliza uma vista materializada para uma grande agregação, o Lakeflow Spark Declarative Pipelines tenta atualizá-la incrementalmente — processando apenas as alterações a montante desde a última atualização em vez de recalcular o conjunto completo de resultados. A atualização incremental é significativamente mais barata do que executar a consulta do zero em cada gatilho do pipeline. Para maximizar a probabilidade de uma vista materializada poder ser atualizada incrementalmente, escreva consultas simples de agregação determinística e evite construtos que impeçam o processamento incremental, como funções não determinísticas.

Consulte Atualização incremental para ver vistas materializadas.

Otimizar uniões

Para junções em que um dos lados é uma pequena tabela de dimensões, adicione um "broadcast hint" para instruir o Spark a realizar o "broadcast" da tabela menor a todos os executores em vez de realizar uma junção por "shuffle":

SQL

CREATE OR REFRESH MATERIALIZED VIEW enriched_orders AS
SELECT o.*, /*+ BROADCAST(p) */ p.product_name, p.category
FROM orders o
JOIN products p ON o.product_id = p.product_id;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast

@dp.materialized_view
def enriched_orders():
    orders = spark.read.table("orders")
    products = spark.read.table("products")
    return orders.join(broadcast(products), "product_id")

Para uniões de proximidade em séries temporais (por exemplo, encontrar o evento mais próximo dentro de um intervalo temporal), utilize uma condição de junção por intervalo e assegure que ambos os lados tenham marcação temporal ao juntar fluxos, ou considere organizar eventos em baldes temporais antes de realizar a junção.

Monitorize os seus pipelines

O registo de eventos do pipeline é a principal primitiva de observabilidade nos pipelines declarativos do Lakeflow Spark. Cada execução de pipeline escreve registos estruturados no log de eventos, abrangendo o progresso da execução, resultados esperados de qualidade dos dados, proveniência dos dados e detalhes de erros. O registo de eventos é uma tabela Delta que podes consultar diretamente.

Para consultar o registo de eventos sem conhecer o caminho de armazenamento subjacente, use a event_log() função de valores de tabela num cluster partilhado ou num warehouse SQL:

SELECT * FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC
LIMIT 100;

Crie painéis de qualidade de dados consultando o registo de eventos para obter as métricas de expectativa. A details coluna contém uma estrutura JSON aninhada com contagens de aprovação e reprovação para cada restrição. Você pode usar esta estrutura para acompanhar as tendências de qualidade ao longo do tempo e alertar sobre possíveis regressões.

Para alertas orientados por eventos, utilize ganchos de eventos para ativar webhooks personalizados ou serviços de notificação (como Slack ou PagerDuty) quando um pipeline falha ou quando um limiar de qualidade de dados é ultrapassado. Hooks de eventos são funções Python que são executadas em resposta a eventos de pipeline.

Para mais informações, consulte Monitorizar pipelines, Registo de eventos de pipelines e Definir monitorização personalizada de pipelines com ganchos de eventos.

Use computação em ambiente serverless

Databricks recomenda computação em modelo serverless para novos pipelines. Com o serverless, não existe configuração manual do cluster — a Databricks gere a infraestrutura automaticamente. Pipelines serverless utilizam auto-escalonamento avançado que pode escalar quer horizontalmente (mais executores) quer verticalmente (executores de maior tamanho) em resposta às exigências da carga de trabalho. As pipelines serverless utilizam sempre o Unity Catalog, pelo que a governança e o acompanhamento de linhagem estão integrados por padrão.

Para mais informações, consulte Configurar um pipeline serverless.

Organizar pipelines com a arquitetura medallion

A arquitetura do medalhão organiza os dados em três camadas lógicas — bronze, prata e ouro — cada uma com um propósito distinto. Mapear os tipos de conjunto de dados do Lakeflow Spark Declarative Pipelines para a camada correta mantém as responsabilidades de cada camada limpas e facilita a manutenção dos pipelines.

  • Bronze: Use tabelas de streaming para ingerir dados brutos de armazenamento na nuvem, barramentos de mensagens ou fontes CDC. As tabelas de bronze preservam os dados brutos da fonte com transformações mínimas, tornando possível que as camadas de prata ou ouro possam voltar a processar a partir da fonte na camada de bronze se os requisitos mudarem.
  • Prata: Use tabelas de streaming para transformações incrementais ao nível das linhas (filtragem, limpeza e análise sintática). Utilize vistas materializadas quando a lógica da camada de prata envolve junções de enriquecimento contra tabelas de dimensões ou agregações complexas que beneficiam de uma atualização incremental.
  • Ouro: Utilize vistas materializadas para pré-calcular agregações, métricas e resumos servidos a painéis, ferramentas de reporte e consumidores a jusante.

Separe a ingestão (bronze) e a transformação (prata e ouro) em DAGs de pipelines distintos sempre que possível. Desacoplar as camadas permite agendar, monitorizar e solucionar problemas de cada camada de forma independente, e uma falha num pipeline de transformação não impede que novos dados cheguem à camada bronze.

Para obter mais informações, consulte Tabelas de streaming e Vistas materializadas.