Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Note
O recurso de Feed de Dados de Alteração do Lakebase está em Visualização Pública.
O que é o Feed de Dados de Alteração do Lakebase?
O Lakebase apresenta um CDF nativo, um fluxo de dados de alterações, o que libera seus dados operacionais para pipelines posteriores, modelos e aplicativos. Cada inserção, atualização e exclusão em uma tabela Postgres do Lakebase é capturada do write-ahead log (WAL) e armazenada como uma nova linha em uma tabela Delta gerenciada pelo Unity Catalog, agrupadas em lotes e descarregadas a cada ~15 segundos. O histórico de alterações é armazenado em um formato aberto que qualquer mecanismo de computação pode ler.
As tabelas de destino seguem a mesma estrutura que o Feed de Dados de Alterações Delta: cada linha contém um _pg_change_type, um LSN, um identificador de transação e um carimbo de data e hora. As alterações operacionais tornam-se uma fonte primária para ETL, auditoria e consumidores downstream — sem precisar implementar uma stack externa de CDC.
Casos de uso
O CDF do Lakebase traz dados operacionais para o lakehouse para que pipelines downstream e aplicativos possam reagir às alterações à medida que elas acontecem.
| Caso de uso | Description |
|---|---|
| Pipelines de ETL | Use Lakebase como fonte de bronze para pipelines de medalhão. Crie trabalhos incrementais de SDP ou de streaming estruturado do Spark com base no feed de alterações e atualize as tabelas downstream das camadas silver e gold. |
| Logs de auditoria | Mantenha um histórico completo e consultável de cada inserção, atualização e exclusão em uma tabela do Lakebase para conformidade e perícia. O histórico é imutável, Delta. |
| Sistemas externos | Armazene dados de alteração do Lakebase em um formato aberto que qualquer mecanismo possa consumir. Como o destino é uma tabela Delta no Catálogo do Unity, sistemas externos e leitores que não são do Databricks podem acessar o feed diretamente. |
Habilitar esta visualização
Um administrador do workspace deve habilitar a visualização do Feed de Dados de Alterações do Lakebase na página Visualizações do workspace.
Requirements
- Dimensionamento automático do Lakebase: Um projeto de dimensionamento automático do Lakebase executando o Postgres 17.
-
Banco de dados de origem: As tabelas devem residir no
databricks_postgresbanco de dados no Lakebase. Cada projeto é criado com esse banco de dados padrão. Essa é uma limitação conhecida. - Unity Catalog: A identidade que configura o CDF precisa de USE CATALOG, USE SCHEMA e CREATE TABLE no catálogo e esquema de destino. Consulte Conceder permissões em um objeto.
- Armazenamento padrão: Não há suporte para catálogos de destino configurados com armazenamento padrão.
- Projeto lakebase: Sua função Postgres requer permissões CAN MANAGE no projeto Lakebase. Os proprietários do projeto têm CAN MANAGE por padrão. Consulte Gerenciar permissões de projeto.
- Tipos de dados: Consulte o mapeamento de tipo de dados. Os tipos sem um equivalente Delta direto são armazenados como STRING.
Configurar o CDF do Lakebase
Para começar, defina a identidade da réplica completa nas tabelas desejadas no feed (Etapa 1) e inicie a CDF no aplicativo Lakebase (Etapa 2). Seus dados aparecem como lb_<table_name>_history tabelas Delta no catálogo e no esquema do Unity Catalog que você escolher.
Etapa 1: Definir a identidade da réplica completa
Para que uma tabela do Lakebase participe do CDF, ela deve ter REPLICA IDENTITY FULL definido. Por padrão, o Postgres registra apenas a chave primária quando uma linha é atualizada ou excluída. Definir a identidade completa faz com que o Postgres registre, no log de pré-gravação, o estado da linha antes e depois da alteração, de que o CDF precisa para montar um histórico completo das alterações.
Você pode executar esses comandos no Editor de SQL do Lakebase ou em qualquer cliente postgres.
Tabela única
ALTER TABLE <table_name> REPLICA IDENTITY FULL;
Todas as tabelas existentes em um esquema
Para definir a identidade de réplica em todas as tabelas existentes em um esquema (public neste exemplo), execute:
DO $$
DECLARE r record;
BEGIN
FOR r IN
SELECT table_schema, table_name
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_type = 'BASE TABLE'
LOOP
EXECUTE format(
'ALTER TABLE %I.%I REPLICA IDENTITY FULL;',
r.table_schema, r.table_name
);
END LOOP;
END $$;
Aplicar automaticamente a tabelas futuras
Para fazer com que todas as tabelas recém-criadas recebam REPLICA IDENTITY FULLautomaticamente, instale um gatilho de evento postgres. Ele é executado após cada CREATE TABLE e define a identidade na nova tabela:
CREATE OR REPLACE FUNCTION public.set_full_replica_identity()
RETURNS event_trigger
LANGUAGE plpgsql
AS $$
DECLARE
obj record;
BEGIN
FOR obj IN
SELECT * FROM pg_event_trigger_ddl_commands()
WHERE command_tag = 'CREATE TABLE'
LOOP
EXECUTE format(
'ALTER TABLE %s REPLICA IDENTITY FULL;',
obj.object_identity
);
END LOOP;
END $$;
CREATE EVENT TRIGGER set_full_replica_identity_on_create
ON ddl_command_end
WHEN TAG IN ('CREATE TABLE')
EXECUTE FUNCTION public.set_full_replica_identity();
Combine o gatilho de evento com o loop na guia anterior para abranger tanto as tabelas existentes quanto as futuras em uma única configuração.
Verifique quais tabelas têm a replica identity definida
Para ver quais tabelas em um esquema têm a identidade da réplica configurada, execute:
SELECT n.nspname AS table_schema,
c.relname AS table_name,
CASE c.relreplident
WHEN 'd' THEN 'default'
WHEN 'n' THEN 'nothing'
WHEN 'f' THEN 'full'
WHEN 'i' THEN 'index'
END AS replica_identity
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
AND n.nspname = 'public'
ORDER BY n.nspname, c.relname;
Somente as linhas com replica_identity = 'full' estão prontas para CDF.
Etapa 2: Iniciar o feed de dados de alterações
O CDF do Lakebase está configurado no nível do esquema. Uma vez iniciada, todas as tabelas atuais e futuras no esquema de origem são incluídas no feed.
- No seu workspace do Azure Databricks, abra Lakebase Postgres no alternador de aplicativos (canto superior direito).
- Selecione seu projeto Lakebase e a branch que deseja usar (por exemplo, production ou main).
- Abra a visão geral do Branch e clique na guia Alterar Feed de Dados .
- Clique em Iniciar.
- Na caixa de diálogo de configuração:
-
Database: O padrão é
databricks_postgres. - Esquema: Selecione o esquema postgres de origem.
- Para o catálogo: Selecione o catálogo de destino do Unity Catalog.
- Esquema: Selecione o esquema de catálogo do Unity de destino.
-
Database: O padrão é
- Clique em Iniciar para iniciar o feed.
As tabelas aparecem no destino como lb_<table_name>_history. Para encontrá-los, abra o Catálogo na barra lateral, navegue até o catálogo e o esquema de destino e abra a guia Tabelas .
A guia Alterar Feed de Dados no Lakebase tem duas subtarefas:
- Esquemas: Lista cada esquema de origem, seu catálogo de destino e esquema no Catálogo do Unity e um status.
-
Tabelas: Lista cada tabela de origem, sua tabela de destino
lb_<table_name>_history, o status (StreamingouSnapshotting), o LSN confirmado (até onde o feed gravou no Delta, mostrado como-enquanto ainda está no snapshot inicial) e a Última atualização (última vez em que a tabela recebeu alterações).
Você também pode inspecionar o estado de feed do Postgres executando-o no Editor de SQL do Lakebase:
SELECT * FROM wal2delta.tables;
O resultado inclui table_oid, status (STREAMING ou SNAPSHOTTING), committed_lsn e last_write_time, de acordo com a tabela.
Importante
O que é wal2delta? O CDF do Lakebase é alimentado pela extensão wal2delta Postgres, que é executada dentro da computação lakebase. Ele usa a decodificação lógica para capturar alterações no WAL (write-ahead log) e as grava nas tabelas Delta do Unity Catalog.
Esquema de tabela de destino
O CDF grava uma tabela Delta para cada tabela de origem, com o nome lb_<table_name>_history em seu catálogo e esquema de destino. Além das colunas de origem, cada linha contém as seguintes colunas do sistema:
| Coluna | Tipo | Description |
|---|---|---|
_pg_change_type |
TEXTO | Tipo de operação: insert, , delete, update_preimageou update_postimage. |
_pg_lsn |
BIGINT | Número da sequência de log do Postgres. |
_pg_xid |
INTEGER | ID da transação do Postgres. |
_timestamp |
TIMESTAMP | Carimbo de data/hora quando a alteração foi processada (sem fuso horário). |
_sort_by |
BIGINT | Chave de classificação monotônica usada para ordenar todas as alterações. |
Padrões comuns de alteração
-
Snapshot inicial: Na primeira vez que o CDF é executado em uma tabela Lakebase existente, cada linha existente é registrada com
_pg_change_type = 'insert'. -
Atualizações: Uma atualização produz duas linhas: uma com
_pg_change_type = 'update_preimage'(linha antiga) e outra com_pg_change_type = 'update_postimage'(nova linha). -
Exclui: Uma exclusão produz uma linha com
_pg_change_type = 'delete'.
Esses são os mesmos eventos de alteração que o Feed de Dados de Alteração Delta, portanto, os mesmos padrões downstream se aplicam.
Comportamento operacional
-
Colisões de nomes: Se duas tabelas de origem seriam mapeadas para o mesmo nome de destino (por exemplo,
sales.usersemarketing.userssendo ambas mapeadas paralb_users_history), o CDF grava a primeira emlb_users_historye adiciona automaticamente um sufixo à segunda, gerandolb_users_history_1. Você pode renomear qualquer tabela de destino no Catálogo do Unity e o feed continua funcionando. - Escopo no nível do esquema: Quando você inicia o CDF em um esquema lakebase, todas as tabelas atuais e futuras nesse esquema são incluídas. Tabelas vazias são ignoradas – uma tabela deve ter pelo menos uma linha para aparecer no destino.
- Tabelas de origem excluídas: Se você excluir uma tabela no Lakebase, a tabela Delta de destino no Unity Catalog será preservada.
Criar pipelines posteriores
O CDF do Lakebase foi projetado para pipelines de downstream que reagem a mudanças operacionais. Os padrões abaixo mostram três maneiras de consumir o feed, ordenadas da mais simples à mais flexível.
Cenário de exemplo. Um aplicativo de comércio eletrônico registra pedidos em uma tabela Postgres orders, em que cada linha contém um item_id e quantity. A equipe de logística precisa de níveis de inventário dinâmicos. Com o CDF, cada alteração em orders é armazenada na tabela Delta lb_orders_history no Unity Catalog. Os pipelines downstream leem esse feed de alterações e atualizam uma inventory_levels tabela sempre que um pedido é colocado, editado ou cancelado.
Calcular o inventário atual com uma visão materializada
O padrão mais simples é uma exibição materializada de SQL sobre a tabela de histórico. O MV é atualizado incrementalmente à medida que novos eventos de alteração chegam e os consumidores downstream o consultam como qualquer outra tabela.
CREATE MATERIALIZED VIEW inventory_levels AS
SELECT
item_id,
SUM(
CASE
-- New orders (and the "new half" of updates) decrement inventory
WHEN _pg_change_type IN ('insert', 'update_postimage') THEN -quantity
-- Cancellations (and the "old half" of updates) restore inventory
WHEN _pg_change_type IN ('delete', 'update_preimage') THEN quantity
ELSE 0
END
) AS current_inventory,
MAX(_timestamp) AS last_transaction_ts,
MAX(_pg_lsn) AS last_lsn
FROM lb_orders_history
GROUP BY item_id;
As duas linhas geradas para cada atualização se cancelam mutuamente, exceto pela variação líquida, portanto a soma acumulada permanece correta conforme os pedidos são editados.
Transmitir alterações com Pipelines Declarativos do Spark
Para uma arquitetura de medalhão estruturado, use o Spark Declarative Pipelines (SDP) para declarar tabelas bronze, prata e ouro. SDP os executa como um pipeline integrado, com pontos de verificação e gerenciamento de dependências gerenciados para você.
import dlt
from pyspark.sql import functions as F
@dlt.table
def inventory_adjustments():
return (
spark.readStream.table("<catalog>.<schema>.lb_orders_history")
.withColumn(
"delta",
F.when(F.col("_pg_change_type").isin("insert", "update_postimage"), -F.col("quantity"))
.when(F.col("_pg_change_type").isin("delete", "update_preimage"), F.col("quantity"))
.otherwise(0),
)
.select("item_id", "delta", "_timestamp")
)
@dlt.expect_or_drop("non_negative_stock", "on_hand >= 0")
@dlt.table
def inventory_levels():
return (
spark.read.table("LIVE.inventory_adjustments")
.groupBy("item_id")
.agg(F.sum("delta").alias("on_hand"))
)
inventory_adjustments lê lb_orders_history incrementalmente com readStream e produz um delta por evento.
inventory_levels agrupa por item_id para calcular o estoque atual. A lógica esperada descarta linhas que deixariam o estoque negativo, sinalizando um bug em uma etapa anterior.
Para obter um passo a passo completo de ponta a ponta, consulte Tutorial: Criar um pipeline de ETL usando a captura de dados de alteração.
Processamento personalizado com streaming estruturado do Spark
Quando precisar de controle total — por exemplo, mesclagens personalizadas, efeitos colaterais ou vários destinos de saída — leia a tabela de histórico diretamente com o Spark Structured Streaming e use foreachBatch para gravar no seu destino.
from pyspark.sql import functions as F
from delta.tables import DeltaTable
def update_inventory(batch_df, batch_id):
deltas = (
batch_df
.withColumn(
"delta",
F.when(F.col("_pg_change_type").isin("insert", "update_postimage"), -F.col("quantity"))
.when(F.col("_pg_change_type").isin("delete", "update_preimage"), F.col("quantity"))
.otherwise(0),
)
.groupBy("item_id")
.agg(F.sum("delta").alias("delta"))
)
target = DeltaTable.forName(spark, "<catalog>.<schema>.inventory_levels")
(target.alias("t")
.merge(deltas.alias("s"), "t.item_id = s.item_id")
.whenMatchedUpdate(set={"on_hand": F.expr("t.on_hand + s.delta")})
.whenNotMatchedInsert(values={"item_id": "s.item_id", "on_hand": "s.delta"})
.execute())
(spark.readStream.table("<catalog>.<schema>.lb_orders_history")
.writeStream
.foreachBatch(update_inventory)
.option("checkpointLocation", "/Volumes/<catalog>/<schema>/checkpoints/inventory_levels")
.start())
Cada microbatch agrega os eventos de alteração por item_id e incorpora os deltas líquidos resultantes em inventory_levels.
Incremental por concepção. Cada tabela lb_<table_name>_history é uma tabela Delta apenas de acréscimo. Cada alteração na origem é registrada como uma nova linha, sendo _pg_change_type o marcador da operação.
Exibições materializadas do Databricks SQL, fluxos de Pipelines Declarativos do Lakeflow Spark e trabalhos de Streaming Estruturado do Spark processam novas linhas incrementalmente do log de transações Delta, portanto, os pipelines downstream só funcionam proporcionalmente ao que mudou. Você não precisa habilitar o Delta Change Data Feed na tabela de histórico, porque a semântica da alteração já está codificada nos dados da linha.
Mapeamento de tipo de dados
A CDF dá suporte à maioria dos tipos primitivos postgreSQL padrão. Os tipos sem um equivalente Delta direto são armazenados como STRING.
| Tipo postgreSQL | tipo Delta do Azure Databricks | Anotações |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT, SMALLINT, BIGINT | INT, SMALLINT, BIGINT | |
| TEXT, VARCHAR, CHAR | STRING | |
| JSONB | STRING | Armazenado como uma cadeia de caracteres JSON. |
| ENUM | STRING | Armazenado como o rótulo do enum. |
| NUMÉRICO / DECIMAL | DECIMAL ou STRING | Usa a precisão/escala de origem quando possível. Executa o redimensionamento sem perdas para valores de precisão/escala incompatíveis. Retorna STRING quando a precisão excede 38 ou quando a precisão ou a escala são indefinidas (NUMERIC não limitado). Todas as colunas NUMERIC/DECIMAL são anuláveis porque os valores naN são mapeados para NULL. Consulte os tipos numéricos do PostgreSQL. |
| DATE | DATE | |
| TIMESTAMP | TIMESTAMP_NTZ | |
| TIMESTAMPTZ | TIMESTAMP | |
| FLOAT, DOUBLE | FLOAT, DOUBLE |
Tipos armazenados como STRING:
-
Geography/Geometry (PostGIS): Tipos da extensão PostGIS (por exemplo,
geometry, ).geography -
Vetor (pgvector): O tipo
vectorda extensão pgvector. -
Tipos compostos/struct: Tipos personalizados definidos com
CREATE TYPE ... AS (field_name type, ...). Estes são tipos semelhantes a linhas com campos nomeados. -
Mapa: Tipos de chave-valor do tipo mapa, como hstore (da extensão
hstore). O Postgres não tem nenhum tipo de mapa interno.hstoreé a maneira comum de armazenar pares chave-valor em uma coluna.
Gerenciando alterações de esquema
-
Renomear uma tabela no Postgres (por exemplo)
ALTER TABLE users RENAME TO customerspermite que o feed continue. O nome da tabela Delta de destino não é alterado – ele permanecelb_users_history. - As alterações de esquema (adicionando uma coluna, descartando uma coluna ou alterando o tipo de dados de uma coluna) disparam um novo instantâneo da tabela afetada. O CDF lê novamente a tabela inteira do Postgres e a reescreve para a tabela Delta de destino.
Desabilitar o CDF do Lakebase
Desativar o CDF interrompe o feed de todos os esquemas do Lakebase no projeto.
- No seu workspace do Azure Databricks, abra Lakebase Postgres a partir do seletor de aplicativos (canto superior direito).
- Selecione o projeto do Lakebase e a ramificação em que você configurou o CDF.
- Abra a visão geral do Branch e clique na guia Alterar Feed de Dados .
- Clique em Desabilitar. Na caixa de diálogo de confirmação, examine o aviso de que as alterações interromperão o fluxo para tabelas Delta e clique em Desabilitar novamente para confirmar.
Desabilitar CDF não reinicia sua computação.
Warning
Se você reativar o CDF mais tarde, o sistema não executará um novo snapshot completo. Todas as alterações que ocorreram durante a desabilitação do CDF estão permanentemente ausentes das tabelas Delta de destino.
Limitações e solução de problemas
Você pode ver o status por tabela (instantâneo, ignorado ou streaming) na guia Alterar Feed de Dados ou executando-o no Lakebase:
SELECT * FROM wal2delta.tables;
Motivos comuns de uma tabela não aparecer no feed:
-
REPLICA IDENTITY FULLnão definido: ExecuteALTER TABLE <table_name> REPLICA IDENTITY FULL;para a tabela. Consulte a Etapa 1: Definir a identidade da réplica completa. - Tabelas particionadas: Não há suporte para tabelas particionadas do Lakebase. Um esquema que contém tabelas particionadas faz com que essas tabelas falhem.
- Tabelas vazias: Uma tabela sem linhas é ignorada até que exista pelo menos uma linha.
Próximas Etapas
- Crie ETL incremental com Pipelines Declarativos do Apache Spark. Confira o Tutorial: Criar um pipeline de ETL usando a captura de dados de alteração para obter um passo a passo completo.
- Consulte a camada bronze com o SQL do Databricks. Veja Introdução ao data warehousing usando o DATAbricks SQL.
- Histórico de auditoria com consultas de viagem no tempo nas tabelas Delta de destino.