Feed de dados de mudanças do Lakebase

Note

O recurso de Feed de Dados de Alteração do Lakebase está em Visualização Pública.

O que é o Feed de Dados de Alteração do Lakebase?

O Lakebase apresenta um CDF nativo, um fluxo de dados de alterações, o que libera seus dados operacionais para pipelines posteriores, modelos e aplicativos. Cada inserção, atualização e exclusão em uma tabela Postgres do Lakebase é capturada do write-ahead log (WAL) e armazenada como uma nova linha em uma tabela Delta gerenciada pelo Unity Catalog, agrupadas em lotes e descarregadas a cada ~15 segundos. O histórico de alterações é armazenado em um formato aberto que qualquer mecanismo de computação pode ler.

As tabelas de destino seguem a mesma estrutura que o Feed de Dados de Alterações Delta: cada linha contém um _pg_change_type, um LSN, um identificador de transação e um carimbo de data e hora. As alterações operacionais tornam-se uma fonte primária para ETL, auditoria e consumidores downstream — sem precisar implementar uma stack externa de CDC.

Fluxo de dados de CDF do Lakebase do Postgres, por meio do wal2delta, até as tabelas Delta no Unity Catalog.

Casos de uso

O CDF do Lakebase traz dados operacionais para o lakehouse para que pipelines downstream e aplicativos possam reagir às alterações à medida que elas acontecem.

Caso de uso Description
Pipelines de ETL Use Lakebase como fonte de bronze para pipelines de medalhão. Crie trabalhos incrementais de SDP ou de streaming estruturado do Spark com base no feed de alterações e atualize as tabelas downstream das camadas silver e gold.
Logs de auditoria Mantenha um histórico completo e consultável de cada inserção, atualização e exclusão em uma tabela do Lakebase para conformidade e perícia. O histórico é imutável, Delta.
Sistemas externos Armazene dados de alteração do Lakebase em um formato aberto que qualquer mecanismo possa consumir. Como o destino é uma tabela Delta no Catálogo do Unity, sistemas externos e leitores que não são do Databricks podem acessar o feed diretamente.

Habilitar esta visualização

Um administrador do workspace deve habilitar a visualização do Feed de Dados de Alterações do Lakebase na página Visualizações do workspace.

Requirements

  • Dimensionamento automático do Lakebase: Um projeto de dimensionamento automático do Lakebase executando o Postgres 17.
  • Banco de dados de origem: As tabelas devem residir no databricks_postgres banco de dados no Lakebase. Cada projeto é criado com esse banco de dados padrão. Essa é uma limitação conhecida.
  • Unity Catalog: A identidade que configura o CDF precisa de USE CATALOG, USE SCHEMA e CREATE TABLE no catálogo e esquema de destino. Consulte Conceder permissões em um objeto.
  • Armazenamento padrão: Não há suporte para catálogos de destino configurados com armazenamento padrão.
  • Projeto lakebase: Sua função Postgres requer permissões CAN MANAGE no projeto Lakebase. Os proprietários do projeto têm CAN MANAGE por padrão. Consulte Gerenciar permissões de projeto.
  • Tipos de dados: Consulte o mapeamento de tipo de dados. Os tipos sem um equivalente Delta direto são armazenados como STRING.

Configurar o CDF do Lakebase

Para começar, defina a identidade da réplica completa nas tabelas desejadas no feed (Etapa 1) e inicie a CDF no aplicativo Lakebase (Etapa 2). Seus dados aparecem como lb_<table_name>_history tabelas Delta no catálogo e no esquema do Unity Catalog que você escolher.

Etapa 1: Definir a identidade da réplica completa

Para que uma tabela do Lakebase participe do CDF, ela deve ter REPLICA IDENTITY FULL definido. Por padrão, o Postgres registra apenas a chave primária quando uma linha é atualizada ou excluída. Definir a identidade completa faz com que o Postgres registre, no log de pré-gravação, o estado da linha antes e depois da alteração, de que o CDF precisa para montar um histórico completo das alterações.

Você pode executar esses comandos no Editor de SQL do Lakebase ou em qualquer cliente postgres.

Tabela única

ALTER TABLE <table_name> REPLICA IDENTITY FULL;

Todas as tabelas existentes em um esquema

Para definir a identidade de réplica em todas as tabelas existentes em um esquema (public neste exemplo), execute:

DO $$
DECLARE r record;
BEGIN
  FOR r IN
    SELECT table_schema, table_name
    FROM information_schema.tables
    WHERE table_schema = 'public'
      AND table_type = 'BASE TABLE'
  LOOP
    EXECUTE format(
      'ALTER TABLE %I.%I REPLICA IDENTITY FULL;',
      r.table_schema, r.table_name
    );
  END LOOP;
END $$;

Aplicar automaticamente a tabelas futuras

Para fazer com que todas as tabelas recém-criadas recebam REPLICA IDENTITY FULLautomaticamente, instale um gatilho de evento postgres. Ele é executado após cada CREATE TABLE e define a identidade na nova tabela:

CREATE OR REPLACE FUNCTION public.set_full_replica_identity()
RETURNS event_trigger
LANGUAGE plpgsql
AS $$
DECLARE
  obj record;
BEGIN
  FOR obj IN
    SELECT * FROM pg_event_trigger_ddl_commands()
    WHERE command_tag = 'CREATE TABLE'
  LOOP
    EXECUTE format(
      'ALTER TABLE %s REPLICA IDENTITY FULL;',
      obj.object_identity
    );
  END LOOP;
END $$;

CREATE EVENT TRIGGER set_full_replica_identity_on_create
ON ddl_command_end
WHEN TAG IN ('CREATE TABLE')
EXECUTE FUNCTION public.set_full_replica_identity();

Combine o gatilho de evento com o loop na guia anterior para abranger tanto as tabelas existentes quanto as futuras em uma única configuração.

Verifique quais tabelas têm a replica identity definida

Para ver quais tabelas em um esquema têm a identidade da réplica configurada, execute:

SELECT n.nspname AS table_schema,
       c.relname AS table_name,
       CASE c.relreplident
         WHEN 'd' THEN 'default'
         WHEN 'n' THEN 'nothing'
         WHEN 'f' THEN 'full'
         WHEN 'i' THEN 'index'
       END AS replica_identity
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
  AND n.nspname = 'public'
ORDER BY n.nspname, c.relname;

Somente as linhas com replica_identity = 'full' estão prontas para CDF.

Etapa 2: Iniciar o feed de dados de alterações

O CDF do Lakebase está configurado no nível do esquema. Uma vez iniciada, todas as tabelas atuais e futuras no esquema de origem são incluídas no feed.

  1. No seu workspace do Azure Databricks, abra Lakebase Postgres no alternador de aplicativos (canto superior direito).
  2. Selecione seu projeto Lakebase e a branch que deseja usar (por exemplo, production ou main).
  3. Abra a visão geral do Branch e clique na guia Alterar Feed de Dados .
  4. Clique em Iniciar.
  5. Na caixa de diálogo de configuração:
    • Database: O padrão é databricks_postgres.
    • Esquema: Selecione o esquema postgres de origem.
    • Para o catálogo: Selecione o catálogo de destino do Unity Catalog.
    • Esquema: Selecione o esquema de catálogo do Unity de destino.
  6. Clique em Iniciar para iniciar o feed.

Visão geral da branch com a guia Feed de Dados de Alterações mostrando Start e a configuração do esquema.

As tabelas aparecem no destino como lb_<table_name>_history. Para encontrá-los, abra o Catálogo na barra lateral, navegue até o catálogo e o esquema de destino e abra a guia Tabelas .

A guia Alterar Feed de Dados no Lakebase tem duas subtarefas:

As subtarefas mostram o mapeamento e o progresso por tabela.

  • Esquemas: Lista cada esquema de origem, seu catálogo de destino e esquema no Catálogo do Unity e um status.
  • Tabelas: Lista cada tabela de origem, sua tabela de destino lb_<table_name>_history, o status (Streaming ou Snapshotting), o LSN confirmado (até onde o feed gravou no Delta, mostrado como - enquanto ainda está no snapshot inicial) e a Última atualização (última vez em que a tabela recebeu alterações).

Você também pode inspecionar o estado de feed do Postgres executando-o no Editor de SQL do Lakebase:

SELECT * FROM wal2delta.tables;

O resultado inclui table_oid, status (STREAMING ou SNAPSHOTTING), committed_lsn e last_write_time, de acordo com a tabela.

Importante

O que é wal2delta? O CDF do Lakebase é alimentado pela extensão wal2delta Postgres, que é executada dentro da computação lakebase. Ele usa a decodificação lógica para capturar alterações no WAL (write-ahead log) e as grava nas tabelas Delta do Unity Catalog.

Esquema de tabela de destino

O CDF grava uma tabela Delta para cada tabela de origem, com o nome lb_<table_name>_history em seu catálogo e esquema de destino. Além das colunas de origem, cada linha contém as seguintes colunas do sistema:

Coluna Tipo Description
_pg_change_type TEXTO Tipo de operação: insert, , delete, update_preimageou update_postimage.
_pg_lsn BIGINT Número da sequência de log do Postgres.
_pg_xid INTEGER ID da transação do Postgres.
_timestamp TIMESTAMP Carimbo de data/hora quando a alteração foi processada (sem fuso horário).
_sort_by BIGINT Chave de classificação monotônica usada para ordenar todas as alterações.

Padrões comuns de alteração

  • Snapshot inicial: Na primeira vez que o CDF é executado em uma tabela Lakebase existente, cada linha existente é registrada com _pg_change_type = 'insert'.
  • Atualizações: Uma atualização produz duas linhas: uma com _pg_change_type = 'update_preimage' (linha antiga) e outra com _pg_change_type = 'update_postimage' (nova linha).
  • Exclui: Uma exclusão produz uma linha com _pg_change_type = 'delete'.

Esses são os mesmos eventos de alteração que o Feed de Dados de Alteração Delta, portanto, os mesmos padrões downstream se aplicam.

Comportamento operacional

  • Colisões de nomes: Se duas tabelas de origem seriam mapeadas para o mesmo nome de destino (por exemplo, sales.users e marketing.users sendo ambas mapeadas para lb_users_history), o CDF grava a primeira em lb_users_history e adiciona automaticamente um sufixo à segunda, gerando lb_users_history_1. Você pode renomear qualquer tabela de destino no Catálogo do Unity e o feed continua funcionando.
  • Escopo no nível do esquema: Quando você inicia o CDF em um esquema lakebase, todas as tabelas atuais e futuras nesse esquema são incluídas. Tabelas vazias são ignoradas – uma tabela deve ter pelo menos uma linha para aparecer no destino.
  • Tabelas de origem excluídas: Se você excluir uma tabela no Lakebase, a tabela Delta de destino no Unity Catalog será preservada.

Criar pipelines posteriores

O CDF do Lakebase foi projetado para pipelines de downstream que reagem a mudanças operacionais. Os padrões abaixo mostram três maneiras de consumir o feed, ordenadas da mais simples à mais flexível.

Cenário de exemplo. Um aplicativo de comércio eletrônico registra pedidos em uma tabela Postgres orders, em que cada linha contém um item_id e quantity. A equipe de logística precisa de níveis de inventário dinâmicos. Com o CDF, cada alteração em orders é armazenada na tabela Delta lb_orders_history no Unity Catalog. Os pipelines downstream leem esse feed de alterações e atualizam uma inventory_levels tabela sempre que um pedido é colocado, editado ou cancelado.

Calcular o inventário atual com uma visão materializada

O padrão mais simples é uma exibição materializada de SQL sobre a tabela de histórico. O MV é atualizado incrementalmente à medida que novos eventos de alteração chegam e os consumidores downstream o consultam como qualquer outra tabela.

CREATE MATERIALIZED VIEW inventory_levels AS
SELECT
  item_id,
  SUM(
    CASE
      -- New orders (and the "new half" of updates) decrement inventory
      WHEN _pg_change_type IN ('insert', 'update_postimage') THEN -quantity
      -- Cancellations (and the "old half" of updates) restore inventory
      WHEN _pg_change_type IN ('delete', 'update_preimage') THEN quantity
      ELSE 0
    END
  ) AS current_inventory,
  MAX(_timestamp) AS last_transaction_ts,
  MAX(_pg_lsn) AS last_lsn
FROM lb_orders_history
GROUP BY item_id;

As duas linhas geradas para cada atualização se cancelam mutuamente, exceto pela variação líquida, portanto a soma acumulada permanece correta conforme os pedidos são editados.

Transmitir alterações com Pipelines Declarativos do Spark

Para uma arquitetura de medalhão estruturado, use o Spark Declarative Pipelines (SDP) para declarar tabelas bronze, prata e ouro. SDP os executa como um pipeline integrado, com pontos de verificação e gerenciamento de dependências gerenciados para você.

import dlt
from pyspark.sql import functions as F

@dlt.table
def inventory_adjustments():
    return (
        spark.readStream.table("<catalog>.<schema>.lb_orders_history")
        .withColumn(
            "delta",
            F.when(F.col("_pg_change_type").isin("insert", "update_postimage"), -F.col("quantity"))
             .when(F.col("_pg_change_type").isin("delete", "update_preimage"), F.col("quantity"))
             .otherwise(0),
        )
        .select("item_id", "delta", "_timestamp")
    )

@dlt.expect_or_drop("non_negative_stock", "on_hand >= 0")
@dlt.table
def inventory_levels():
    return (
        spark.read.table("LIVE.inventory_adjustments")
        .groupBy("item_id")
        .agg(F.sum("delta").alias("on_hand"))
    )

inventory_adjustmentslb_orders_history incrementalmente com readStream e produz um delta por evento. inventory_levels agrupa por item_id para calcular o estoque atual. A lógica esperada descarta linhas que deixariam o estoque negativo, sinalizando um bug em uma etapa anterior.

Para obter um passo a passo completo de ponta a ponta, consulte Tutorial: Criar um pipeline de ETL usando a captura de dados de alteração.

Processamento personalizado com streaming estruturado do Spark

Quando precisar de controle total — por exemplo, mesclagens personalizadas, efeitos colaterais ou vários destinos de saída — leia a tabela de histórico diretamente com o Spark Structured Streaming e use foreachBatch para gravar no seu destino.

from pyspark.sql import functions as F
from delta.tables import DeltaTable

def update_inventory(batch_df, batch_id):
    deltas = (
        batch_df
        .withColumn(
            "delta",
            F.when(F.col("_pg_change_type").isin("insert", "update_postimage"), -F.col("quantity"))
             .when(F.col("_pg_change_type").isin("delete", "update_preimage"), F.col("quantity"))
             .otherwise(0),
        )
        .groupBy("item_id")
        .agg(F.sum("delta").alias("delta"))
    )

    target = DeltaTable.forName(spark, "<catalog>.<schema>.inventory_levels")
    (target.alias("t")
        .merge(deltas.alias("s"), "t.item_id = s.item_id")
        .whenMatchedUpdate(set={"on_hand": F.expr("t.on_hand + s.delta")})
        .whenNotMatchedInsert(values={"item_id": "s.item_id", "on_hand": "s.delta"})
        .execute())

(spark.readStream.table("<catalog>.<schema>.lb_orders_history")
    .writeStream
    .foreachBatch(update_inventory)
    .option("checkpointLocation", "/Volumes/<catalog>/<schema>/checkpoints/inventory_levels")
    .start())

Cada microbatch agrega os eventos de alteração por item_id e incorpora os deltas líquidos resultantes em inventory_levels.

Incremental por concepção. Cada tabela lb_<table_name>_history é uma tabela Delta apenas de acréscimo. Cada alteração na origem é registrada como uma nova linha, sendo _pg_change_type o marcador da operação. Exibições materializadas do Databricks SQL, fluxos de Pipelines Declarativos do Lakeflow Spark e trabalhos de Streaming Estruturado do Spark processam novas linhas incrementalmente do log de transações Delta, portanto, os pipelines downstream só funcionam proporcionalmente ao que mudou. Você não precisa habilitar o Delta Change Data Feed na tabela de histórico, porque a semântica da alteração já está codificada nos dados da linha.

Mapeamento de tipo de dados

A CDF dá suporte à maioria dos tipos primitivos postgreSQL padrão. Os tipos sem um equivalente Delta direto são armazenados como STRING.

Tipo postgreSQL tipo Delta do Azure Databricks Anotações
BOOLEAN BOOLEAN
INT, SMALLINT, BIGINT INT, SMALLINT, BIGINT
TEXT, VARCHAR, CHAR STRING
JSONB STRING Armazenado como uma cadeia de caracteres JSON.
ENUM STRING Armazenado como o rótulo do enum.
NUMÉRICO / DECIMAL DECIMAL ou STRING Usa a precisão/escala de origem quando possível. Executa o redimensionamento sem perdas para valores de precisão/escala incompatíveis. Retorna STRING quando a precisão excede 38 ou quando a precisão ou a escala são indefinidas (NUMERIC não limitado). Todas as colunas NUMERIC/DECIMAL são anuláveis porque os valores naN são mapeados para NULL. Consulte os tipos numéricos do PostgreSQL.
DATE DATE
TIMESTAMP TIMESTAMP_NTZ
TIMESTAMPTZ TIMESTAMP
FLOAT, DOUBLE FLOAT, DOUBLE

Tipos armazenados como STRING:

  • Geography/Geometry (PostGIS): Tipos da extensão PostGIS (por exemplo, geometry, ). geography
  • Vetor (pgvector): O tipo vector da extensão pgvector.
  • Tipos compostos/struct: Tipos personalizados definidos com CREATE TYPE ... AS (field_name type, ...). Estes são tipos semelhantes a linhas com campos nomeados.
  • Mapa: Tipos de chave-valor do tipo mapa, como hstore (da extensão hstore). O Postgres não tem nenhum tipo de mapa interno. hstore é a maneira comum de armazenar pares chave-valor em uma coluna.

Gerenciando alterações de esquema

  • Renomear uma tabela no Postgres (por exemplo) ALTER TABLE users RENAME TO customerspermite que o feed continue. O nome da tabela Delta de destino não é alterado – ele permanece lb_users_history.
  • As alterações de esquema (adicionando uma coluna, descartando uma coluna ou alterando o tipo de dados de uma coluna) disparam um novo instantâneo da tabela afetada. O CDF lê novamente a tabela inteira do Postgres e a reescreve para a tabela Delta de destino.

Desabilitar o CDF do Lakebase

Desativar o CDF interrompe o feed de todos os esquemas do Lakebase no projeto.

  1. No seu workspace do Azure Databricks, abra Lakebase Postgres a partir do seletor de aplicativos (canto superior direito).
  2. Selecione o projeto do Lakebase e a ramificação em que você configurou o CDF.
  3. Abra a visão geral do Branch e clique na guia Alterar Feed de Dados .
  4. Clique em Desabilitar. Na caixa de diálogo de confirmação, examine o aviso de que as alterações interromperão o fluxo para tabelas Delta e clique em Desabilitar novamente para confirmar.

Desabilitar CDF não reinicia sua computação.

Warning

Se você reativar o CDF mais tarde, o sistema não executará um novo snapshot completo. Todas as alterações que ocorreram durante a desabilitação do CDF estão permanentemente ausentes das tabelas Delta de destino.

Limitações e solução de problemas

Você pode ver o status por tabela (instantâneo, ignorado ou streaming) na guia Alterar Feed de Dados ou executando-o no Lakebase:

SELECT * FROM wal2delta.tables;

Motivos comuns de uma tabela não aparecer no feed:

  • REPLICA IDENTITY FULL não definido: Execute ALTER TABLE <table_name> REPLICA IDENTITY FULL; para a tabela. Consulte a Etapa 1: Definir a identidade da réplica completa.
  • Tabelas particionadas: Não há suporte para tabelas particionadas do Lakebase. Um esquema que contém tabelas particionadas faz com que essas tabelas falhem.
  • Tabelas vazias: Uma tabela sem linhas é ignorada até que exista pelo menos uma linha.

Próximas Etapas