Lakebase Änderungsdaten-Feed

Note

Das Feature "Lakebase Change Data Feed" befindet sich in der öffentlichen Vorschau.

Was ist Lakebase Change Data Feed?

Lakebase führt einen nativen Change Data Feed (CDF) ein und entsperrt Ihre Betriebsdaten für nachgeschaltete Pipelines, Modelle und Anwendungen. Jeder Einfüge-, Aktualisierungs- und Löschvorgang in einer Lakebase-Postgres-Tabelle wird aus dem Write-Ahead-Log erfasst und als neue Zeile in einer von Unity Catalog verwalteten Delta-Tabelle gespeichert, in Batches verarbeitet und etwa alle 15 Sekunden geschrieben. Der Änderungsverlauf wird in einem geöffneten Format gespeichert, das jedes Computemodul lesen kann.

Die Zieltabellen folgen demselben Shape wie der Delta Change Data Feed: Jede Zeile trägt einen _pg_change_typeLSN, eine Transaktions-ID und einen Zeitstempel. Operative Änderungen werden zu einer vollwertigen Quelle für ETL, Audit und nachgelagerte Consumer — ohne einen externen CDC-Stack aufsetzen zu müssen.

Lakebase CDF-Datenfluss von Postgres bis wal2delta zu Delta-Tabellen im Unity-Katalog.

Anwendungsfälle

Lakebase CDF bringt Betriebsdaten in das Seehaus ein, sodass nachgelagerte Pipelines und Anwendungen auf Änderungen reagieren können, sobald sie geschehen.

Anwendungsfall Description
ETL-Pipelines Verwenden Sie Lakebase als Bronze-Quelle für Medallion-Pipelines. Erstellen Sie inkrementelle SDP- oder Spark Structured Streaming-Aufträge gegen den Änderungsfeed und aktualisieren Sie nachgelagerte Silver- und Gold-Tabellen.
Überwachungsprotokolle Verwalten Sie einen vollständigen, abfragbaren Verlauf jedes Einfügens, Aktualisierens und Löschens in einer Lakebase-Tabelle für Compliance und Forensik. Die Delta-Historie ist unveränderbar.
Externe Systeme Speichern Sie Lakebase-Änderungsdaten in einem offenen Format, das jedes Modul nutzen kann. Da es sich bei dem Ziel um eine Delta-Tabelle im Unity-Katalog handelt, können externe Systeme und Nicht-Databricks-Leser direkt auf den Feed zugreifen.

Diese Vorschau aktivieren

Ein Arbeitsbereichsadministrator muss die Vorschau für Lakebase Change Data Feed auf der Vorschauseite des Arbeitsbereichs aktivieren.

Requirements

  • Lakebase Autoscaling: Ein Lakebase Autocaling-Projekt mit Postgres 17.
  • Quelldatenbank: Tabellen müssen sich in der databricks_postgres Datenbank in Lakebase befinden. Jedes Projekt wird mit dieser Standarddatenbank erstellt. Dies ist eine bekannte Einschränkung.
  • Unity-Katalog: Die Identität, die CDF konfiguriert, benötigt USE CATALOG, USE SCHEMAund CREATE TABLE im Zielkatalog und -schema. Siehe Erteilen von Berechtigungen für ein Objekt.
  • Standardspeicher: Zielkataloge, die mit Standardspeicher konfiguriert sind, werden nicht unterstützt.
  • Lakebase-Projekt: Ihre Postgres-Rolle erfordert CAN MANAGE-Berechtigungen für das Lakebase-Projekt. Projektinhaber verfügen standardmäßig über CAN MANAGE. Siehe "Projektberechtigungen verwalten".
  • Datentypen: Siehe Datentypzuordnung. Typen ohne direkte Delta-Entsprechung werden als STRING gespeichert.

Einrichten des Lakebase CDF

Um zu beginnen, legen Sie die Replikatidentität vollständig für die Tabellen fest, die Sie im Feed (Schritt 1) benötigen, und starten Sie dann CDF in der Lakebase-App (Schritt 2). Ihre Daten werden als lb_<table_name>_history Delta-Tabellen im Unity-Katalogkatalog und im von Ihnen ausgewählten Schema angezeigt.

Schritt 1: Festlegen der Replikatidentität vollständig

Damit eine Lakebase-Tabelle an CDF teilnehmen kann, muss für sie REPLICA IDENTITY FULL festgelegt sein. Standardmäßig protokolliert Postgres nur den Primärschlüssel, wenn eine Zeile aktualisiert oder gelöscht wird. Das Festlegen der vollen Replikatidentität veranlasst Postgres, den Zustand der Zeile vor und nach der Änderung im Write-Ahead-Log zu speichern, den CDF benötigt, um eine vollständige Änderungshistorie zu erstellen.

Sie können diese Befehle im Lakebase SQL-Editor oder einem beliebigen Postgres-Client ausführen.

Einzelne Tabelle

ALTER TABLE <table_name> REPLICA IDENTITY FULL;

Alle vorhandenen Tabellen in einem Schema

Führen Sie Folgendes aus, um die Replikatidentität für jede vorhandene Tabelle in einem Schemapublic (in diesem Beispiel) festzulegen:

DO $$
DECLARE r record;
BEGIN
  FOR r IN
    SELECT table_schema, table_name
    FROM information_schema.tables
    WHERE table_schema = 'public'
      AND table_type = 'BASE TABLE'
  LOOP
    EXECUTE format(
      'ALTER TABLE %I.%I REPLICA IDENTITY FULL;',
      r.table_schema, r.table_name
    );
  END LOOP;
END $$;

Automatisches Anwenden auf zukünftige Tabellen

Damit jede neu erstellte Tabelle automatisch empfangen wird REPLICA IDENTITY FULL, installieren Sie einen Postgres-Ereignistrigger. Wird nach jedem CREATE TABLE ausgeführt und legt die Identität für die neue Tabelle fest:

CREATE OR REPLACE FUNCTION public.set_full_replica_identity()
RETURNS event_trigger
LANGUAGE plpgsql
AS $$
DECLARE
  obj record;
BEGIN
  FOR obj IN
    SELECT * FROM pg_event_trigger_ddl_commands()
    WHERE command_tag = 'CREATE TABLE'
  LOOP
    EXECUTE format(
      'ALTER TABLE %s REPLICA IDENTITY FULL;',
      obj.object_identity
    );
  END LOOP;
END $$;

CREATE EVENT TRIGGER set_full_replica_identity_on_create
ON ddl_command_end
WHEN TAG IN ('CREATE TABLE')
EXECUTE FUNCTION public.set_full_replica_identity();

Kombinieren Sie den Ereignistrigger mit der Schleife auf der vorherigen Registerkarte, um vorhandene und zukünftige Tabellen in einem Setup abzudecken.

Überprüfen, bei welchen Tabellen die Replikatidentität festgelegt ist

Um zu sehen, welche Tabellen in einem Schema die Replikatidentität konfiguriert haben, führen Sie Folgendes aus:

SELECT n.nspname AS table_schema,
       c.relname AS table_name,
       CASE c.relreplident
         WHEN 'd' THEN 'default'
         WHEN 'n' THEN 'nothing'
         WHEN 'f' THEN 'full'
         WHEN 'i' THEN 'index'
       END AS replica_identity
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
  AND n.nspname = 'public'
ORDER BY n.nspname, c.relname;

Nur Zeilen mit replica_identity = 'full' sind für CDF bereit.

Schritt 2: Starten des Änderungsdatenfeeds

Lakebase CDF ist auf Schemaebene konfiguriert. Nach dem Start ist jede aktuelle und zukünftige Tabelle im Quellschema im Feed enthalten.

  1. Öffnen Sie in Ihrem Azure Databricks Arbeitsbereich Lakebase Postgres über den App-Switcher (oben rechts).
  2. Wählen Sie Ihr Lakebase-Projekt und den Zweig aus, den Sie verwenden möchten (z. B. Produktion oder Haupt).
  3. Öffnen Sie die Branch-Übersicht und klicken Sie auf die Registerkarte „Change Data Feed“.
  4. Klicken Sie auf Start.
  5. Im Konfigurationsdialogfeld:
    • Datenbank: Standardmäßig auf databricks_postgres gesetzt.
    • Schema: Wählen Sie das Quell-Postgres-Schema aus.
    • Zu Katalog: Wählen Sie den Zielkatalog in Unity Catalog aus.
    • Schema: Wählen Sie das Zielschema des Unity-Katalogs aus.
  6. Klicken Sie auf "Start" , um den Feed zu beginnen.

Branch-Übersicht mit der Registerkarte „Change Data Feed“, die Start- und Schemakonfiguration anzeigt.

Tabellen erscheinen im Zieltext als lb_<table_name>_history. Um sie zu finden, öffnen Sie den Katalog in der Randleiste, navigieren Sie zum Zielkatalog und zum Schema, und öffnen Sie die Registerkarte "Tabellen ".

Die Registerkarte " Datenfeed ändern" in Lakebase enthält zwei Unterregisterkarten:

Untertabs zeigen die Zuordnung sowie den Fortschritt je Tabelle an.

  • Schemas: Listet jedes Quellschema, seinen Zielkatalog und sein Schema im Unity-Katalog und einen Status auf.
  • Tabellen: Listet jede Quelltabelle, die zugehörige lb_<table_name>_history Zieltabelle, den Status (Streaming oder Snapshotting), die Bestätigte LSN (wie weit der Feed nach Delta geschrieben hat, angezeigt als -, solange noch die anfängliche Momentaufnahme läuft) und Letzte Aktualisierung (der letzte Zeitpunkt, zu dem die Tabelle Änderungen erhalten hat) auf.

Sie können den Feedstatus auch von Postgres überprüfen, indem Sie diesen im Lakebase SQL-Editor ausführen:

SELECT * FROM wal2delta.tables;

Das Ergebnis enthält table_oid, status (STREAMING oder SNAPSHOTTING), committed_lsnund last_write_time pro Tabelle.

Important

Was ist wal2delta? Lakebase CDF wird von der Wal2delta Postgres Erweiterung unterstützt, die innerhalb der Lakebase Compute läuft. Es verwendet die logische Dekodierung, um Änderungen aus dem Write-Ahead Log (WAL) zu erfassen und sie in Delta-Tabellen im Unity Catalog zu schreiben.

Zieltabellenschema

CDF schreibt eine Delta-Tabelle pro Quelltabelle, die in Ihrem Zielkatalog und Schema benannt ist lb_<table_name>_history . Neben Ihren Quellspalten enthält jede Zeile die folgenden Systemspalten:

Kolumne Typ Description
_pg_change_type TEXT Vorgangstyp: insert, delete, , update_preimageoder update_postimage.
_pg_lsn BIGINT Postgres Log Sequence Number.
_pg_xid INTEGER Postgres Transaction ID.
_timestamp TIMESTAMP Zeitstempel, als die Änderung verarbeitet wurde (ohne Zeitzone).
_sort_by BIGINT Monotoner Sortierschlüssel, der zum Sortieren aller Änderungen verwendet wird.

Allgemeine Änderungsmuster

  • Anfängliche Momentaufnahme: Wenn CDF zum ersten Mal auf einer vorhandenen Lakebase-Tabelle ausgeführt wird, wird jede vorhandene Zeile mit _pg_change_type = 'insert'geschrieben.
  • Updates: Eine Aktualisierung erzeugt zwei Zeilen: eine mit _pg_change_type = 'update_preimage' (alte Zeile) und eine mit _pg_change_type = 'update_postimage' (neue Zeile).
  • Löscht: Ein Löschvorgang erzeugt eine Zeile mit _pg_change_type = 'delete'.

Hierbei handelt es sich um die gleichen Änderungsereignisse wie delta Change Data Feed, sodass dieselben downstream-Muster angewendet werden.

Betriebsverhalten

  • Benennungskonflikte: Wenn zwei Quelltabellen demselben Zielnamen zugeordnet würden (zum Beispiel, wenn sales.users und marketing.users beide lb_users_history zugeordnet würden), schreibt CDF die erste nach lb_users_history und hängt an die zweite automatisch ein Suffix an, sodass sie zu lb_users_history_1 wird. Sie können eine der Zieltabellen im Unity-Katalog umbenennen, und der Feed funktioniert weiterhin.
  • Bereich auf Schemaebene: Wenn Sie CDF für ein Lakebase-Schema starten, ist jede aktuelle und zukünftige Tabelle in diesem Schema enthalten. Leere Tabellen werden übersprungen – eine Tabelle muss mindestens eine Zeile enthalten, damit sie im Ziel angezeigt wird.
  • Abgelegte Quelltabellen: Wenn Sie eine Tabelle in Lakebase ablegen, wird die Zieldelta-Tabelle im Unity-Katalog beibehalten.

Erstellen nachgeschalteter Pipelines

Lakebase CDF ist für nachgelagerte Pipelines konzipiert, die auf betriebsbedingte Änderungen reagieren. Die folgenden Muster zeigen drei Möglichkeiten, den Feed zu nutzen, sortiert von der einfachsten bis zur flexibelsten.

Beispielszenario. Eine E-Commerce-App erfasst Bestellungen in einer Postgres-Tabelle orders, wobei jede Zeile ein item_id und ein quantity enthält. Das Logistikteam benötigt Live-Lagerbestände. Mit CDF wird jede Änderung an orders in der Delta-Tabelle lb_orders_history im Unity Catalog gespeichert. Nachgelagerte Pipelines lesen diesen Änderungsfeed und aktualisieren eine inventory_levels-Tabelle, sobald eine Bestellung aufgegeben, bearbeitet oder storniert wird.

Berechnen des aktuellen Inventars mit einer materialisierten Ansicht

Das einfachste Muster ist eine materialisierte SQL-Ansicht über die Verlaufstabelle. Die MV wird inkrementell aktualisiert, wenn neue Änderungsereignisse eintreffen, und nachgelagerte Nutzer fragen sie wie jede andere Tabelle ab.

CREATE MATERIALIZED VIEW inventory_levels AS
SELECT
  item_id,
  SUM(
    CASE
      -- New orders (and the "new half" of updates) decrement inventory
      WHEN _pg_change_type IN ('insert', 'update_postimage') THEN -quantity
      -- Cancellations (and the "old half" of updates) restore inventory
      WHEN _pg_change_type IN ('delete', 'update_preimage') THEN quantity
      ELSE 0
    END
  ) AS current_inventory,
  MAX(_timestamp) AS last_transaction_ts,
  MAX(_pg_lsn) AS last_lsn
FROM lb_orders_history
GROUP BY item_id;

Die beiden Zeilen, die für jede Aktualisierung erstellt wurden, brechen einander ab, mit Ausnahme der Nettoänderung, sodass die laufende Summe korrekt bleibt, wenn Bestellungen bearbeitet werden.

Streamänderungen mit Spark Declarative Pipelines

Verwenden Sie für eine strukturierte Medallion-Architektur Spark Declarative Pipelines (SDP), um Bronze-, Silber- und Goldtabellen zu deklarieren. SDP führt sie als zusammenhängende Pipeline aus, wobei Prüfpunkte und die Abhängigkeitsverwaltung für Sie übernommen werden.

import dlt
from pyspark.sql import functions as F

@dlt.table
def inventory_adjustments():
    return (
        spark.readStream.table("<catalog>.<schema>.lb_orders_history")
        .withColumn(
            "delta",
            F.when(F.col("_pg_change_type").isin("insert", "update_postimage"), -F.col("quantity"))
             .when(F.col("_pg_change_type").isin("delete", "update_preimage"), F.col("quantity"))
             .otherwise(0),
        )
        .select("item_id", "delta", "_timestamp")
    )

@dlt.expect_or_drop("non_negative_stock", "on_hand >= 0")
@dlt.table
def inventory_levels():
    return (
        spark.read.table("LIVE.inventory_adjustments")
        .groupBy("item_id")
        .agg(F.sum("delta").alias("on_hand"))
    )

inventory_adjustments lb_orders_history liest inkrementell mit readStream und erzeugt ein Delta pro Ereignis. inventory_levels aggregiert nach item_id, um den aktuellen Lagerbestand zu berechnen. Es wird erwartet, dass Zeilen verworfen werden, die den Bestand ins Negative bringen würden, was auf einen vorgelagerten Fehler hindeutet.

Eine vollständige End-to-End-Anleitung finden Sie im Tutorial: Erstellen Sie eine ETL-Pipeline mithilfe der Änderungsdatenerfassung.

Benutzerdefinierte Verarbeitung mit Spark Structured Streaming

Wenn Sie vollständige Kontrolle benötigen – z. B. für benutzerdefinierte Zusammenführungen, Seiteneffekte oder mehrere Zielsysteme –, lesen Sie die Verlaufstabelle direkt mit Spark Structured Streaming und verwenden Sie foreachBatch, um in Ihr Zielsystem zu schreiben.

from pyspark.sql import functions as F
from delta.tables import DeltaTable

def update_inventory(batch_df, batch_id):
    deltas = (
        batch_df
        .withColumn(
            "delta",
            F.when(F.col("_pg_change_type").isin("insert", "update_postimage"), -F.col("quantity"))
             .when(F.col("_pg_change_type").isin("delete", "update_preimage"), F.col("quantity"))
             .otherwise(0),
        )
        .groupBy("item_id")
        .agg(F.sum("delta").alias("delta"))
    )

    target = DeltaTable.forName(spark, "<catalog>.<schema>.inventory_levels")
    (target.alias("t")
        .merge(deltas.alias("s"), "t.item_id = s.item_id")
        .whenMatchedUpdate(set={"on_hand": F.expr("t.on_hand + s.delta")})
        .whenNotMatchedInsert(values={"item_id": "s.item_id", "on_hand": "s.delta"})
        .execute())

(spark.readStream.table("<catalog>.<schema>.lb_orders_history")
    .writeStream
    .foreachBatch(update_inventory)
    .option("checkpointLocation", "/Volumes/<catalog>/<schema>/checkpoints/inventory_levels")
    .start())

Jeder Mikro-Batch aggregiert die Änderungsereignisse nach item_id und führt die Netto-Deltas in inventory_levels zusammen.

Von Grund auf inkrementell. Jede lb_<table_name>_history Tabelle ist eine Nur-Anfüge-Delta-Tabelle. Jede Quelländerung wird als neue Zeile aufgezeichnet, wobei _pg_change_type den Vorgang kennzeichnet. Databricks SQL materialized views, Lakeflow Spark Declarative Pipelines-Flows und Spark Structured Streaming-Jobs verarbeiten neue Zeilen inkrementell aus dem Delta-Transaktionsprotokoll, sodass nachgelagerte Pipelines nur Arbeit im Verhältnis zu den Änderungen ausführen. Sie müssen den Delta-Änderungsdatenfeed in der Verlaufstabelle nicht aktivieren, da die Änderungssemantik bereits in den Zeilendaten codiert ist.

Datentypzuordnung

CDF unterstützt die meisten standardmäßigen PostgreSQL-Grundtypen. Typen ohne direkte Delta-Entsprechung werden als STRING gespeichert.

PostgreSQL-Typ Azure Databricks Delta-Typ Hinweise
BOOLEAN BOOLEAN
INT, SMALLINT, BIGINT INT, SMALLINT, BIGINT
TEXT, VARCHAR, CHAR STRING
JSONB STRING Als JSON-Zeichenfolge gespeichert.
Enum STRING Wird als Enum-Bezeichnung gespeichert.
NUMERISCH /DEZIMAL DEZIMAL ODER ZEICHENFOLGE Verwendet nach Möglichkeit die Präzision/Skalierung der Quelle. Führt eine verlustlose Reskalierung für inkompatible Genauigkeits-/Skalierungswerte aus. Fällt auf STRING zurück, wenn die Genauigkeit 38 überschreitet oder wenn Genauigkeit/Skalierung nicht definiert (ungebundene ZAHL) ist. Alle NUMERISCHEn/DEZIMALspalten können nullwerte sein, da NaN-Werte NULL zugeordnet sind. Siehe PostgreSQL-numerische Typen.
DATE DATE
TIMESTAMP TIMESTAMP_NTZ
TIMESTAMPTZ TIMESTAMP
FLOAT, DOUBLE FLOAT, DOUBLE

Als STRING gespeicherte Typen:

  • Geografie/Geometrie (PostGIS): Typen aus der PostGIS-Erweiterung (z. B geometry. , geography).
  • Vektor (pgvector): Der vector Typ aus der Erweiterung pgvector.
  • Zusammengesetzte Typen/Strukturtypen: Benutzerdefinierte Typen, die mit CREATE TYPE ... AS (field_name type, ...) definiert sind. Hierbei handelt es sich um zeilenähnliche Typen mit benannten Feldern.
  • Map: Map-ähnliche Schlüssel-Wert-Typen wie hstore (aus der Erweiterung hstore). Postgres hat keinen integrierten Kartentyp. hstore ist die gängige Methode zum Speichern von Schlüsselwertpaaren in einer Spalte.

Verwalten von Schemaänderungen

  • Durch das Umbenennen einer Tabelle in Postgres (z. B ALTER TABLE users RENAME TO customers. ) kann der Feed fortgesetzt werden. Der Name der Ziel-Delta-Tabelle ändert sich nicht — er bleibt lb_users_history.
  • Schemaänderungen (Hinzufügen einer Spalte, Ablegen einer Spalte oder Ändern des Datentyps einer Spalte) lösen eine erneute Momentaufnahme der betroffenen Tabelle aus. CDF liest die gesamte Tabelle aus Postgres neu und schreibt sie in die Ziel-Delta-Tabelle um.

Lakebase CDF deaktivieren

Durch deaktivieren des CDF wird der Feed für alle Lakebase-Schemas im Projekt beendet.

  1. Öffnen Sie in Ihrem Azure Databricks Arbeitsbereich Lakebase Postgres über den App-Switcher (oben rechts).
  2. Wählen Sie Ihr Lakebase-Projekt und den Zweig aus, in dem Sie CDF konfiguriert haben.
  3. Öffnen Sie die Branch-Übersicht und klicken Sie auf die Registerkarte „Change Data Feed“.
  4. Klicken Sie auf "Deaktivieren". Überprüfen Sie im Bestätigungsdialog die Warnung, dass keine Änderungen mehr in Delta-Tabellen übernommen werden, und klicken Sie dann zur Bestätigung erneut auf Deaktivieren.

Durch deaktivieren des CDF wird die Berechnung nicht neu gestartet.

Warning

Wenn Sie CDF später erneut aktivieren, führt das System keine vollständige erneute Momentaufnahme durch. Alle Änderungen, die während der Deaktivierung des CDF aufgetreten sind, fehlen dauerhaft in den Delta-Zieltabellen.

Einschränkungen und Problembehandlung

Sie können den Status jeder Tabelle (Snapshot-Erstellung, übersprungen oder Streaming) auf der Registerkarte Change Data Feed anzeigen oder indem Sie Folgendes in Lakebase ausführen:

SELECT * FROM wal2delta.tables;

Häufige Gründe, warum eine Tabelle nicht im Feed angezeigt wird:

  • REPLICA IDENTITY FULL nicht festgelegt: Führen Sie ALTER TABLE <table_name> REPLICA IDENTITY FULL; für die Tabelle aus. Siehe Schritt 1: Festlegen der Replikatidentität vollständig.
  • Partitionierte Tabellen: Partitionierte Lakebase-Tabellen werden nicht unterstützt. Ein Schema, das partitionierte Tabellen enthält, führt dazu, dass diese Tabellen fehlschlagen.
  • Leere Tabellen: Eine Tabelle mit null Zeilen wird übersprungen, bis mindestens eine Zeile vorhanden ist.

Nächste Schritte