Freigeben über


Bewährte Methoden für Lakeflow Spark Declarative Pipelines

Auf dieser Seite werden empfohlene Muster für das Entwerfen, Erstellen und Betreiben von Pipelines mit Lakeflow Spark Declarative Pipelines beschrieben. Wenden Sie diese Richtlinien an, wenn Sie eine neue Pipeline starten oder eine vorhandene Pipeline verbessern.

Auswählen des richtigen Datasettyps

Lakeflow Spark Declarative Pipelines bietet drei Datasettypen: Streamingtabellen, materialisierte Ansichten und temporäre Ansichten. Die Auswahl des richtigen Typs für jede Ebene Ihrer Pipeline vermeidet unnötige Berechnungskosten und sorgt dafür, dass Ihr Code leicht zu begründen ist.

Streamingtabellen sind die richtige Wahl für Datenaufnahme und Streamingtransformationen mit geringer Latenz. Jede Eingabezeile wird nur einmal gelesen und verarbeitet, wodurch sie ideal für Append-Only-Workloads, hochvolumige Daten und event-getriebene Verarbeitung aus Cloud-Speicher oder Nachrichtenbussen geeignet sind.

Materialisierte Ansichten sind die richtige Wahl für komplexe Transformationen und analytische Abfragen. Ihre Ergebnisse werden vorab berechnet und mithilfe der inkrementellen Aktualisierung auf dem neuesten Stand gehalten, sodass Abfragen dafür schnell sind. Sie können die Daten in einer materialisierten Ansicht nicht direkt ändern – die Abfragedefinition steuert die Ausgabe.

Temporäre Ansichten sind Pipeline-gebundene Ansichten, die Ihre Transformationslogik organisieren, ohne Daten zu speichern. Verwenden Sie sie für Zwischenschritte, die keine eigene Tabelle benötigen.

In der folgenden Tabelle wird zusammengefasst, wann jeder Typ verwendet werden soll:

Anwendungsfall Empfohlener Typ Grund
Einbindung aus Cloudspeicher oder Nachrichten-Bus Streamingtabelle Verarbeitet jeden Datensatz einmal; behandelt Nur-Anfüge-Workloads mit hohem Volumen.
CDC-Datenströme (Einfügungen, Aktualisierungen, Löschungen) Streamingtabelle Als Ziel wird APPLY CHANGES INTO für die geordnete und deduplizierte CDC-Aufnahme verwendet.
Komplexe Aggregationen und Verknüpfungen Materialisierte Ansicht Inkrementell aktualisiert; vermeidet die vollständige Neuberechnung für jedes Update.
Beschleunigung von Dashboard-Abfragen Materialisierte Ansicht Vorab berechnete Ergebnisse beschleunigen Abfragen im Vergleich zu Rohdaten-Tabellen.
Zwischentransformationen (keine nachgeschalteten Leser) Temporäre Ansicht Organisiert Pipelinelogik, ohne dass Speicherkosten anfallen.

Weitere Informationen finden Sie in den Konzepten "Streamingtabellen", " Materialisierte Ansichten" und " Lakeflow Spark Declarative Pipelines".

Deklaratives CDC anstelle von imperativem MERGE verwenden

Das Implementieren der Änderungsdatenerfassung (Change Data Capture, CDC) mit imperativen SQL-Anweisungen MERGE erfordert erheblichen benutzerdefinierten Code, um die Ereignisbestellung, Deduplizierung, partielle Updates und die Schemaentwicklung ordnungsgemäß zu behandeln. Jedes dieser Bedenken muss unabhängig gelöst werden, und der resultierende Code ist schwer zu warten und zu testen.

Lakeflow Spark Declarative Pipelines stellt die Anweisung APPLY CHANGES INTO (SQL) und die Funktion apply_changes() (Python) bereit, die Sortierung, Deduplizierung, Out-of-Order-Ereignisse und Schemaentwicklung deklarativ behandelt. Sie beschreiben die Form des Änderungsfeeds und der Zieltabelle – die Pipeline übernimmt den Rest. APPLY CHANGES INTO unterstützt sowohl SCD Type 1 (Überschreiben) als auch SCD Type 2 (Historienbewahrung).

Weitere Informationen finden Sie unter Änderungsdatenerfassung und Momentaufnahmen und Die AUTO CDC-APIs: Vereinfachen der Änderungsdatenerfassung mit Pipelines.

Sicherstellung der Datenqualität durch Erwartungen

Erwartungen sind wahr/falsch SQL-Ausdrücke, die auf jede Zeile angewendet werden, die durch ein Dataset übergeben wird. Wenn eine Zeile die Bedingung nicht erfüllt, reagiert die Pipeline gemäß der von Ihnen konfigurierten Verletzungsrichtlinie. Erwartungen melden Metriken unabhängig von der Policy an das Pipeline-Ereignisprotokoll, sodass Sie Datenqualitätstrends im Laufe der Zeit nachverfolgen können.

Auswählen einer Verletzungsrichtlinie

Es stehen drei Richtlinien für Verstöße zur Verfügung. Wählen Sie das Objekt aus, das Ihrer Toleranz für schlechte Daten entspricht:

  • warnen (Standard): Datensätze, die ungültig sind, werden in die Zieltabelle geschrieben und in Metriken gekennzeichnet. Verwenden Sie diese Richtlinie, wenn Sie alle Daten erfassen müssen, aber Einen Einblick in Qualitätsprobleme wünschen.
  • drop: Datensätze, die ungültig sind, werden vor dem Schreiben verworfen. Verwenden Sie dies, wenn fehlerhafte Zeilen erwartet werden und sich nicht nachgelagert ausbreiten dürfen.
  • fail: Das Pipelineupdate wird für den ersten ungültigen Datensatz beendet. Verwenden Sie dies für kritische Daten, bei denen ein fehlerhafter Datensatz auf ein ernsthaftes vorhergehendes Problem hinweist.

Die folgenden Beispiele zeigen jede Richtlinie, die auf eine Streamingtabelle angewendet wird:

SQL

-- Warn: write invalid records but track them in metrics
CREATE OR REFRESH STREAMING TABLE orders_raw (
  CONSTRAINT valid_order_id EXPECT (order_id IS NOT NULL)
) AS SELECT * FROM STREAM read_files("/volumes/raw/orders", format => "json");

-- Drop: discard invalid records before writing
CREATE OR REFRESH STREAMING TABLE orders_clean (
  CONSTRAINT non_negative_amount EXPECT (amount >= 0) ON VIOLATION DROP ROW
) AS SELECT * FROM STREAM(orders_raw);

-- Fail: stop the pipeline on any invalid record
CREATE OR REFRESH STREAMING TABLE orders_critical (
  CONSTRAINT required_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE
) AS SELECT * FROM STREAM(orders_clean);

Python

from pyspark import pipelines as dp

# Warn: write invalid records but track them in metrics
@dp.table
@dp.expect("valid_order_id", "order_id IS NOT NULL")
def orders_raw():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "json") \
        .load("/volumes/raw/orders")

# Drop: discard invalid records before writing
@dp.table
@dp.expect_or_drop("non_negative_amount", "amount >= 0")
def orders_clean():
    return spark.readStream.table("orders_raw")

# Fail: stop the pipeline on any invalid record
@dp.table
@dp.expect_or_fail("required_customer_id", "customer_id IS NOT NULL")
def orders_critical():
    return spark.readStream.table("orders_clean")

Ungültige Datensätze unter Quarantäne

Wenn Sie verlorene Datensätze für die Untersuchung beibehalten möchten, anstatt sie stillschweigend zu verwerfen, verwenden Sie ein Quarantänemuster. Leiten Sie Zeilen, die die Validierung nicht bestehen, an eine separate Streamingtabelle weiter, indem Sie zwei Abläufe verwenden: einen, der ungültige Zeilen aus der Haupttabelle entfernt, und einen zweiten, der nur die ungültigen Zeilen in eine Quarantänetabelle schreibt. Auf diese Weise können Sie fehlerhafte Daten untersuchen, korrigieren und erneut verarbeiten, ohne ihr sauberes Dataset zu verunreinigen.

Ein detailliertes Beispiel für das Quarantänemuster finden Sie unter Erwartungsempfehlungen und erweiterte Muster.

Weitere Informationen zu den Erwartungen finden Sie unter Verwalten der Datenqualität mit den Pipeline-Erwartungen.

Parametrisieren Ihrer Pipelines

Pipelines verfügen über Standardkatalog- und Schemaeinstellungen, sodass Code, der innerhalb desselben Katalogs und Schemas liest und schreibt, umgebungsübergreifend ohne Parameter funktioniert. Wenn Ihre Pipeline jedoch auf einen zweiten Katalog oder ein zweites Schema verweisen muss – zum Beispiel aus einem gemeinsamen Quellkatalog, der sich zwischen Entwicklung und Produktion unterscheidet –, vermeiden Sie es, diese Namen fest im Code einzubinden. Definieren Sie sie stattdessen als Pipelinekonfigurationsparameter (Schlüsselwertpaare, die in den Pipelineeinstellungen festgelegt sind), und verweisen Sie in Ihrem Code darauf. Auf diese Weise kann eine einzelne Codebasis korrekt und umgebungsübergreifend ausgeführt werden, indem die Parameterwerte ausgetauscht werden.

SQL

CREATE OR REFRESH MATERIALIZED VIEW transaction_summary AS
SELECT account_id, COUNT(txn_id) AS txn_count, SUM(amount) AS total_amount
FROM ${source_catalog}.sales.transactions
GROUP BY account_id;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import count, sum

@dp.materialized_view
def transaction_summary():
    source_catalog = spark.conf.get("source_catalog")
    return spark.read.table(f"{source_catalog}.sales.transactions") \
        .groupBy("account_id") \
        .agg(
            count("txn_id").alias("txn_count"),
            sum("amount").alias("total_amount")
        )

Weitere Informationen finden Sie unter Verwenden von Parametern mit Pipelines.

Auswählen des richtigen Pipelinemodus für jede Umgebung

Entwicklungs- und Produktions-Update-Modi

Pipelines werden entweder im Entwicklungs-Update-Modus oder im Produktions-Update-Modus ausgeführt. Wählen Sie den Modus aus, der Ihrem Ziel entspricht.

Im Entwicklungsmodus verwendet die Pipeline einen lang laufenden Cluster über Updates hinweg und führt bei Fehlern keine Wiederholungen aus. Dadurch wird der Iterationszyklus beschleunigt, wenn Sie Pipelinecode erstellen und testen, da Fehlerdetails sofort angezeigt werden, ohne auf Clusterneustarts zu warten.

Im Produktionsmodus wird der Cluster nach Abschluss jedes Updates umgehend heruntergefahren, wodurch die Berechnungskosten reduziert werden. Die Pipeline wendet auch eskalierende Wiederholungen an, einschließlich Clusterneustarts, um vorübergehende Ausfälle der Infrastruktur automatisch zu behandeln. Verwenden Sie den Produktionsmodus für alle geplanten Pipelineausführungen.

Ausgelöster und fortlaufender Pipelinemodus

Der ausgelöste Modus verarbeitet alle verfügbaren Daten und stoppt dann. Dies ist die richtige Wahl für die überwiegende Mehrheit der Pipelines: diejenigen, die in einem Zeitplan (stündlich, täglich oder bei Bedarf) ausgeführt werden und keine Aktualität von Unterminutendaten erfordern.

Im kontinuierlichen Modus läuft der Cluster und verarbeitet neue Daten, sobald sie eingehen. Dies ist nur dann sinnvoll, wenn für Den Anwendungsfall eine Latenz im Bereich von Sekunden bis Minuten erforderlich ist. Da für den fortlaufenden Modus ein immer aktivierter Cluster erforderlich ist, ist er wesentlich teurer als der ausgelöste Modus.

Weitere Informationen finden Sie unter Triggered vs. continuous pipeline mode und Pipelines konfigurieren.

Verwenden von Flüssigclustering für das Datenlayout

Das Flüssige Clustering ersetzt statische Partitionierung und ZORDER optimiert das Datenlayout in Delta-Tabellen. Im Gegensatz zur Partitionierung, die von Ihnen erfordert, dass Sie vorab eine Partitionsspalte auswählen und zu einer Neigung der Daten führen kann, wenn Werte ungleichmäßig verteilt sind, ist Liquid Clustering selbstoptimierend, verzerrungsresistent und inkrementell – nur die Daten, die eine Reorganisation erfordern, werden bei jeder Ausführung neu geschrieben.

Ändern Sie Clusterspalten jederzeit, ohne die vollständige Tabelle neu zu schreiben, während sich Abfragemuster entwickeln.

Definieren Sie Clusterspalten in Ihrer Streamtabelle-Definition:

SQL

CREATE OR REFRESH STREAMING TABLE events
CLUSTER BY (event_date, region)
AS SELECT * FROM STREAM read_files("/volumes/raw/events", format => "parquet");

Python

from pyspark import pipelines as dp

@dp.table(cluster_by=["event_date", "region"])
def events():
    return spark.readStream.format("cloudFiles") \
        .option("cloudFiles.format", "parquet") \
        .load("/volumes/raw/events")

Wenn Sie sich nicht sicher sind, nach welchen Spalten clusteriert werden soll, können Sie die Auswahl der optimalen Clusterspalten basierend auf der Abfragelast automatisch CLUSTER BY AUTO Databricks überlassen.

Weitere Informationen finden Sie unter Streamingtabellen und Verwenden von flüssigen Clustern für Tabellen.

Verwalten von Pipelines mit CI/CD und Databricks Asset Bundles

Versionskontrolle des Pipeline-Quellcodes und Verwenden von Databricks Asset Bundles zum Verwalten von Bereitstellungen in allen Umgebungen.

Weitere Informationen finden Sie unter Erstellen einer versionsgesteuerten Pipeline, Konvertieren einer Pipeline in ein Databricks Asset-Bundle-Projekt und Verwendung von Parametern mit Pipelines.

Pipeline-Code im Versionskontrollsystem speichern

Speichern Sie alle Pipelinequelldateien (Python und SQL) zusammen mit Ihrer Bundlekonfiguration in einem Git-Repository. Mit der Versionssteuerung des vollständigen Projekts erhalten Sie einen vollständigen Verlauf der Änderungen, vereinfachen die Zusammenarbeit und können Änderungen in einer Entwicklungsumgebung überprüfen, bevor Sie sie in die Produktion fördern.

Databricks empfiehlt Databricks Asset Bundles für die Verwaltung dieses Workflows. Ein Bündel definiert Ihre Pipelinekonfiguration in YAML zusammen mit Ihrem Quellcode, und mit der databricks bundle CLI können Sie Pipelines aus Ihrem Terminal oder einem CI/CD-System überprüfen, bereitstellen und ausführen.

Verwenden von Bundlezielen für die Umgebungsisolation

Bündel ermöglichen mehrere Ziele (z. B. dev, staging, prod), jedes mit eigenen Außerkraftsetzungen für Katalognamen, Clusterrichtlinien, Benachrichtigungsadressen und andere Einstellungen. Kombinieren Sie Bündelziele mit Pipelineparametern, um die richtigen umgebungsspezifischen Werte zur Bereitstellungszeit einzuführen und den Quellcode frei von Umgebungskonstanten zu halten.

Ein typischer Workflow sieht wie folgt aus:

  1. Ein Entwickler arbeitet an einem Feature-Branch, der in einer persönlichen Entwicklungspipeline in einem Entwicklerkatalog bereitgestellt wird.
  2. Beim Zusammenführen mit dem Hauptzweig führt ein CI-System databricks bundle validate und databricks bundle deploy --target staging aus, um die Pipeline in einer Staging-Umgebung zu validieren und bereitzustellen.
  3. Nach bestandenen Tests wird das CI-System in die Produktion mit databricks bundle deploy --target prod verwendet.

Bewährte Methoden für Streaming

Verwenden Sie diese Muster, um Zustand zu verwalten, verspätete Daten zu steuern und Streamingpipelinen zuverlässig zu halten.

Weitere Informationen finden Sie unter Optimieren der zustandsbehafteten Verarbeitung mit Wasserzeichen, Wiederherstellen einer Pipeline bei Fehlern mit Streaming-Prüfpunkten und Nachfüllen von historischen Daten mit Pipelines.

Verwenden von Wasserzeichen für zustandsbehaftete Vorgänge

Wasserzeichen gebunden den Zustand, den die Pipeline während zustandsbehafteter Streamingvorgänge im Arbeitsspeicher behält, z. B. fenstergebundene Aggregationen und Deduplizierung. Ohne Wasserzeichen wächst der Zustand ungebunden, da die Pipeline Daten für jeden möglichen Schlüssel ansammelt, was schließlich zu Out-of-Memory-Fehlern bei langen Pipelines führt.

Ein Wasserzeichen gibt eine Zeitstempelspalte und einen Toleranzschwellenwert für verspätete Daten an. Datensätze, die nach Ablauf des Schwellenwerts eingehen, werden verworfen. Wählen Sie einen Schwellenwert aus, der ihre Toleranz für verspätete Daten gegen die Speicherkosten ausgleicht, um diesen Zustand offen zu halten.

Im folgenden Beispiel wird eine einminütige Tumbling-Window-Aggregation mit einem dreiminütigen Wasserzeichen berechnet.

SQL

CREATE OR REFRESH STREAMING TABLE event_counts AS
SELECT window(event_time, '1 minute') AS time_window, region, COUNT(*) AS cnt
FROM STREAM(events_raw)
  WATERMARK event_time DELAY OF INTERVAL 3 MINUTES
GROUP BY time_window, region;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import window

@dp.table
def event_counts():
    return (
        spark.readStream.table("events_raw")
            .withWatermark("event_time", "3 minutes")
            .groupBy(window("event_time", "1 minute"), "region")
            .count()
    )

Hinweis

Um sicherzustellen, dass Aggregationen inkrementell verarbeitet werden, anstatt für jede Aktualisierung vollständig neu zu komputieren, müssen Sie ein Wasserzeichen definieren.

Grundlegendes zum Streamingstatus und zur vollständigen Aktualisierung

Streaming-Status ist inkrementell: Die Pipeline erstellt und verwaltet den Zustand über Updates hinweg, anstatt jedes Mal neu zu berechnen. Dies macht zustandsbehaftetes Streaming effizient, bedeutet aber auch, dass der vorhandene Zustand nicht mehr mit der neuen Logik kompatibel ist, wenn Sie die Logik einer zustandsbehafteten Abfrage ändern (z. B. einen Wasserzeichenschwellenwert ändern oder Aggregationsspalten ändern). In diesem Fall müssen Sie eine vollständige Aktualisierung durchführen, um alle historischen Daten mit der neuen Logik neu zu verarbeiten und den Zustand von Grund auf neu zu erstellen.

Eine vollständige Aktualisierung kann auch zu Datenverlust führen, wenn die Quelle keine historischen Daten speichert. Beispielsweise kann eine Kafka-Quelle mit einem kurzen Aufbewahrungszeitraum nur die letzten Minuten der Daten zum Zeitpunkt der Aktualisierung verfügbar sein, was zu einer Tabelle führt, die weit weniger Daten enthält als zuvor. Planen Sie Änderungen an der Logik zustandsbehafteter Abfragen sorgfältig, insbesondere für Datenströme mit hohem Volumen, bei denen eine vollständige Aktualisierung kostenintensiv ist oder wenn die Quelle nur begrenzte Datenaufbewahrung bietet. Die Verwendung der Medallion-Architektur trägt dazu bei, Bronzetabellen mit minimaler Transformation zu erstellen und ermöglicht es, dass Silber- oder Goldtabellen aus den Bronzetabellen mit gesamter Historie neu berechnet werden.

Stream-Stream-Verknüpfungen

Stream-Stream-Verknüpfungen erfordern ein Wasserzeichen auf beiden Seiten der Verknüpfung und eine zeitgebundene Verknüpfungsbedingung. Das Zeitintervall in der Verknüpfungsbedingung teilt dem Streamingmodul mit, wenn keine weiteren Übereinstimmungen möglich sind, sodass er den Zustand entfernen kann, der nicht mehr abgeglichen werden kann. Wenn Sie entweder die Wasserzeichen oder die zeitgebundene Bedingung weglassen, wächst der Zustand ohne Grenzen.

Im folgenden Beispiel werden Anzeigeneindruckereignisse mit Klickereignissen verknüpft, sodass der Klick innerhalb von drei Minuten nach dem Eindruck erfolgen muss:

SQL

CREATE OR REFRESH STREAMING TABLE impression_clicks AS
SELECT imp.ad_id, imp.impression_time, clk.click_time
FROM STREAM(ad_impressions)
    WATERMARK impression_time DELAY OF INTERVAL 3 MINUTES AS imp
JOIN STREAM(user_clicks)
    WATERMARK click_time DELAY OF INTERVAL 3 MINUTES AS clk
ON imp.ad_id = clk.ad_id
  AND clk.click_time BETWEEN imp.impression_time
    AND imp.impression_time + INTERVAL 3 MINUTES;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import expr

dp.create_streaming_table("impression_clicks")

@dp.append_flow(target="impression_clicks")
def join_impressions_and_clicks():
    impressions = spark.readStream.table("ad_impressions") \
        .withWatermark("impression_time", "3 minutes")
    clicks = spark.readStream.table("user_clicks") \
        .withWatermark("click_time", "3 minutes")
    return impressions.alias("imp").join(
        clicks.alias("clk"),
        expr("""
            imp.ad_id = clk.ad_id AND
            clk.click_time BETWEEN imp.impression_time AND imp.impression_time + INTERVAL 3 MINUTES
        """),
        "leftOuter"
    )

Wenn Sie einen Datenstrom mit einer statischen Tabelle (snapshot join) kombinieren, wird die Momentaufnahme der statischen Tabelle zu Beginn jedes Mikrobatchs aktualisiert. Dies bedeutet, dass spät eintreffende Dimensionsdatensätze nicht rückwirkend auf bereits verarbeitete Daten angewendet werden. Wenn eine rückwirkende Anwendung erforderlich ist, verwenden Sie eine materialisierte Ansicht, oder strukturieren Sie die Pipeline neu.

Optimieren der Pipelineleistung

Wenden Sie diese Techniken an, um Rechenkosten zu reduzieren und Pipelineupdates zu beschleunigen.

Weitere Informationen finden Sie unter Materialisierte Ansichten und Optimierung der zustandsbehafteten Verarbeitung mit Wasserzeichen.

Vermeiden kleiner Dateien

Wenn eine Pipeline zu häufig auf einer Quelle mit geringem Volumen ausgelöst wird, wird eine große Anzahl kleiner Dateien in den Cloudspeicher geschrieben. Kleine Dateien beeinträchtigen die Leseleistung, da für jede Datei eine separate Metadatensuche und E/A-Roundtrip erforderlich sind, und Cloudspeicher-APIs drosseln Listenvorgänge im großen Maßstab. Um dies zu vermeiden, wählen Sie ein Triggerintervall aus, das Ihrem Datenvolume entspricht: Führen Sie ausgelöste Pipelines in einem Zeitplan aus, mit dem eine sinnvolle Menge von Daten zwischen Updates gesammelt werden kann, anstatt kontinuierlich.

Umgang mit Datenverzerrung

Datenverzerrung tritt auf, wenn Werte in einem Join- oder GroupBy-Schlüssel ungleichmäßig über Partitionen verteilt werden, was dazu führt, dass eine kleine Anzahl von Aufgaben den Großteil der Daten verarbeitet. Dadurch werden Hotspots erstellt, die die End-to-End-Updatezeit erhöhen. Verwenden Sie Liquid Clustering, um Verzerrungen in gespeicherten Tabellen zu beheben. Bei Schiefe, die während der Berechnung in der Luft auftritt, salzen Sie stark verzerrte Schlüssel, indem Sie ein zufälliges Bucketsuffix anfügen, bevor Sie in zwei Phasen gruppieren und aggregieren.

Weitere Informationen finden Sie unter Verwenden von Flüssigclustering für das Datenlayout.

Verwenden Sie inkrementelle Aktualisierung für materialisierte Ansichten

Wenn Sie eine materialisierte Ansicht für eine große Aggregation verwenden, versucht Lakeflow Spark Declarative Pipelines, sie inkrementell zu aktualisieren, indem nur die vorgelagerten Änderungen seit der letzten Aktualisierung verarbeitet werden, anstatt das vollständige Ergebnis neu zu berechnen. Die inkrementelle Aktualisierung ist wesentlich günstiger als das erneute Ausführen der Abfrage bei jedem Pipeline-Trigger. Um die Wahrscheinlichkeit zu maximieren, dass eine materialisierte Ansicht inkrementell aktualisiert werden kann, schreiben Sie einfache, deterministische Aggregationsabfragen und vermeiden Sie Konstrukte, die die inkrementelle Verarbeitung verhindern, z. B. nicht deterministische Funktionen.

Siehe inkrementelle Aktualisierung für materialisierte Ansichten.

Optimieren von Verknüpfungen

Fügen Sie für Verknüpfungen, bei denen eine Seite eine kleine Dimensionstabelle ist, einen Übertragungshinweis hinzu, um Spark anzuweisen, die kleinere Tabelle an alle Executoren zu übertragen, anstatt eine Shuffle-Verknüpfung auszuführen:

SQL

CREATE OR REFRESH MATERIALIZED VIEW enriched_orders AS
SELECT o.*, /*+ BROADCAST(p) */ p.product_name, p.category
FROM orders o
JOIN products p ON o.product_id = p.product_id;

Python

from pyspark import pipelines as dp
from pyspark.sql.functions import broadcast

@dp.materialized_view
def enriched_orders():
    orders = spark.read.table("orders")
    products = spark.read.table("products")
    return orders.join(broadcast(products), "product_id")

Verwenden Sie für Näherungsverknüpfungen der Zeitreihe (z. B. das nächste Ereignis innerhalb eines Zeitraums) eine Bereichsverknüpfungsbedingung, und stellen Sie sicher, dass beide Seiten über ein Wasserzeichen verfügen, wenn Datenströme verknüpft werden, oder erwägen Sie, Ereignisse vor dem Verknüpfen in Zeit-Buckets einzuschließen.

Überwachen ihrer Pipelines

Das Pipeline-Ereignisprotokoll ist der primäre Grundbaustein der Beobachtbarkeit in Lakeflow Spark Declarative Pipelines. Jeder Pipeline-Durchlauf schreibt strukturierte Datensätze in das Ereignisprotokoll, die den Ausführungsfortschritt, die Ergebnisse der Datenqualitätsanforderungen, die Datenherkunft und Fehlerdetails abdecken. Das Ereignisprotokoll ist eine Delta-Tabelle, die Sie direkt abfragen können.

Um das Ereignisprotokoll abzufragen, ohne den zugrunde liegenden Speicherpfad zu kennen, verwenden Sie die event_log() Tabellenwertfunktion in einem freigegebenen Cluster oder SQL Warehouse:

SELECT * FROM event_log('<pipeline-id>')
WHERE event_type = 'flow_progress'
ORDER BY timestamp DESC
LIMIT 100;

Erstellen Sie Datenqualitätsdashboards, indem Sie das Ereignisprotokoll für die Erwartungsmetriken abfragen. Die details Spalte enthält eine geschachtelte JSON-Struktur mit Pass-/Fail-Zählungen für jede Einschränkung, die Sie verwenden können, um Qualitätstrends im Laufe der Zeit und Warnungen bei Regressionen nachzuverfolgen.

Verwenden Sie für ereignisgesteuerte Warnungen Ereignis-Hooks, um benutzerdefinierte Webhooks oder Benachrichtigungsdienste (z. B. Slack oder PagerDuty) auszulösen, wenn eine Pipeline fehlschlägt oder wenn ein Schwellenwert für die Datenqualität verletzt wird. Ereignishaken sind Python-Funktionen, die als Reaktion auf Pipelineereignisse ausgeführt werden.

Weitere Informationen finden Sie unter Überwachen von Pipelines, Pipelineereignisprotokoll und Definieren der benutzerdefinierten Überwachung von Pipelines mit Ereignishaken.

Serverlose Berechnung verwenden

Databricks empfiehlt serverlose Berechnung für neue Pipelines. Ohne Server gibt es keine manuelle Clusterkonfiguration – Databricks verwaltet die Infrastruktur automatisch. Serverlose Pipelines verwenden eine erweiterte automatische Skalierung, die sowohl horizontal (mehr Executoren) als auch vertikal (größere Ausführungsgröße) als Reaktion auf Workloadanforderungen skaliert werden kann. Serverlose Pipelines verwenden immer Unity-Katalog, sodass Governance- und Lineage-Tracking standardmäßig integriert sind.

Weitere Informationen finden Sie unter Konfigurieren einer serverlosen Pipeline.

Organisieren von Pipelines mit der Medallion-Architektur

Die Medallion-Architektur organisiert Daten in drei logische Schichten – Bronze, Silber und Gold – jeweils mit einem eindeutigen Zweck. Die Zuordnung von Lakeflow Spark Declarative Pipelines-Datasettypen auf die richtige Ebene sorgt dafür, dass die Verantwortlichkeiten der einzelnen Ebenen klar sind und die Verwaltung von Pipelines vereinfacht wird.

  • Bronze: Verwenden Sie Streamingtabellen zum Aufnehmen von Rohdaten aus Cloudspeicher, Nachrichtenbussen oder CDC-Quellen. Bronzetabellen bewahren die Rohquelldaten mit minimaler Transformation, sodass Silber- oder Goldschichten die Daten von der Quelle der Bronzeschicht neu verarbeiten können, wenn sich die Anforderungen ändern.
  • Silber: Verwenden Sie Streamingtabellen für inkrementelle Transformationen auf Zeilenebene (Filtern, Bereinigen und Analysieren). Verwenden Sie materialisierte Ansichten, wenn die Logik der Silber-Schicht Anreicherungsverknüpfungen mit Dimensionstabellen oder komplexe Aggregationen umfasst, die von einer inkrementellen Aktualisierung profitieren.
  • Gold: Verwenden Sie materialisierte Ansichten, um Aggregationen, Metriken und Zusammenfassungen vorab zu berechnen, die Dashboards, Berichterstellungstools und nachgeschalteten Verbrauchern bereitgestellt werden.

Wenn möglich, trennen Sie die Aufnahme (Bronze) und Transformation (Silber und Gold) in unterschiedliche Pipeline-DAGs. Durch das Decoupieren der Ebenen können Sie jede Ebene unabhängig planen, überwachen und behandeln, und ein Fehler in einer Transformationspipeline blockiert nicht, dass neue Daten in Bronze landen.

Weitere Informationen finden Sie unter Streamingtabellen und materialisierte Ansichten.