Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Important
REPLACE-Flüsse WHERE befinden sich in der Betaversion.
Auf dieser Seite wird beschrieben, wie Sie REPLACE WHERE Flows in Lakeflow Spark Declarative Pipelines verwenden, um eine bestimmte Teilmenge einer Tabelle neu zu berechnen und zu überschreiben, ohne den gesamten Verlauf der Tabelle neu zu verarbeiten. REPLACE-Abläufe WHERE verarbeiten spät ankommende Daten, vorgelagerte Neuverarbeitung, Schemaentwicklung und Rückfüllungen.
In einem REPLACE-Flow WHERE definieren Sie ein Prädikat für die Zieltabelle. Alle Zeilen, die dem Prädikat entsprechen, werden gelöscht und durch erneute Auswertung der Quellabfrage für denselben Prädikatbereich ersetzt. Zeilen, die nicht mit dem Prädikat übereinstimmen, bleiben unverändert.
Requirements
REPLACE-Flüsse WHERE weisen die folgenden Anforderungen auf:
- Ihre Pipeline muss den
PREVIEWKanal verwenden. - Databricks empfiehlt Unity-Katalog und serverlose Berechnung. Die inkrementelle Aktualisierung wird nur auf serverloser Berechnung unterstützt.
Wann REPLACE-WHEREFlows verwendet werden sollten
Verwenden Sie REPLACE WHEREAbläufe für die folgenden Szenarien:
- Inkrementelle Batchverarbeitung ohne Streamingsemantik: Verarbeiten neuer Zeilen in Batches ohne Verwaltung von Streamingkonzepten wie Wasserzeichen.
- Selektive Neuverarbeitung: Kompieren Sie nur Zeilen, die einem Prädikat entsprechen, während alle anderen Zeilen unverändert bleiben.
-
Szenarien, die über standardmäßige materialisierte Ansichtsfunktionen hinausgehen:
- Zieltabellen mit längerer Aufbewahrung als die Quelle
- Verhindern der Neukompilierung, wenn sich eine Dimensionstabelle ändert
- Schema-Evolution ohne Neuberechnung der gesamten Historie
Erstellen eines REPLACE-Flusses WHERE
Definieren Sie REPLACE WHERE Flows in SQL oder Python.
SQL
Verwenden Sie die Klausel FLOW REPLACE WHERE inline mit CREATE STREAMING TABLE:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Alternativ können Sie die Longformsyntax CREATE FLOW verwenden:
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Python
In Python werden die Tabelle und der Fluss in einer einzelnen Anweisung definiert. Der Fluss erbt denselben Namen wie die Tabelle:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
Der replace_where Parameter akzeptiert entweder einen PySpark-Spaltenausdruck oder ein Zeichenfolgen-Prädikat.
In diesen Beispielen werden alle Zeilen der letzten 7 Tage aus orders_enriched gelöscht und anhand der Quellabfrage neu berechnet. Sie müssen das Prädikat nicht zur Quellabfrage hinzufügen. Die Pipeline-Engine wendet es automatisch an, wenn Daten aus der Quelle gelesen werden.
Note
BY NAME ist in SQL erforderlich. Sie gleicht Spalten anhand des Namens und nicht der Position ab.
Zurückfüllen von historischen Daten
Wenn Sie verlaufsbezogene oder korrigierte Zeilen außerhalb geplanter Aktualisierungen in die Zieltabelle schreiben möchten, wählen Sie zwischen zwei Mechanismen basierend auf dem Speicherort der historischen Daten aus:
- Prädikatüberschreibungen: Führen Sie die Quellabfrage des Flusses für einen einmaligen Prädikatbereich erneut aus. Wird verwendet, wenn die Verlaufsdaten aus derselben Quelle stammen wie die inkrementellen Daten.
- DML-Anweisungen: Fügen Sie direkt in die Zieltabelle ein, und umgehen Sie den Fluss. Wird verwendet, wenn sich die historischen Daten in einer anderen Quelle befinden als die inkrementellen Daten.
Prädikatüberschreibungen
Überschreiben Sie das REPLACE-Prädikat WHERE für ein einzelnes Pipelineupdate, ohne die Pipelinedefinition zu ändern. Überschreibungen von Prädikaten sind einmalig, gelten nur für die aktuelle Aktualisierung und wirken sich nicht auf zukünftige Durchläufe aus.
Beispiel: Initialer historischer Ladevorgang
So führen Sie beim erstmaligen Einrichten einer Pipeline eine einmalige Nachbefüllung historischer Daten durch:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
Beispiel: Korrigieren einer Spalte für einen bestimmten Zeitraum
Nachdem Sie eine Spaltendefinition aktualisiert haben, übernehmen Sie die Änderung rückwirkend für einen bestimmten historischen Bereich:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
Kombinieren mehrerer Dimensionen in einer einzelnen Prädikatüberschreibung:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
Hilfsfunktion: start_update_with_replace_where
Verwenden Sie die Pipelineupdate-API aus einem Notizbuch, um Prädikatüberschreibungen zu übermitteln:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
DML-Anweisungen
Führen Sie DML-Anweisungen direkt auf der Zieltabelle außerhalb der Pipeline aus, um Erstladungen oder Korrekturen durchzuführen, z. B. das Laden aus einer Legacy-Tabelle:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Zeilen, die über DML eingefügt werden, unterliegen nicht dem REPLACE-Prädikat WHERE und bleiben bei geplanten Aktualisierungen erhalten, es sei denn, sie fallen in den Prädikatbereich einer zukünftigen Ausführung.
Vollständiges Aktualisierungsverhalten
Eine vollständige Aktualisierung eines REPLACE WHERE-Flows führt die Quellabfrage unter ausschließlicher Verwendung des aktuellen Prädikats erneut aus. Zeilen, die durch Prädikatüberschreibungen oder DML-Anweisungen außerhalb des aktuellen Prädikatbereichs eingefügt wurden, werden endgültig gelöscht.
Warning
Eine vollständige Aktualisierung löscht alle vorhandenen Daten und führt den Fluss nur mithilfe des definierten Prädikats erneut aus. Wenn eine Pipeline ein Jahr lang mit einem 7-Tage-Prädikat ausgeführt wurde, führt eine vollständige Aktualisierung dazu, dass die Tabelle nur die Daten der letzten 7 Tage enthält. Alle älteren Zeilen werden endgültig gelöscht.
Um vollständige Aktualisierungen einer Tabelle zu verhindern, legen Sie die Tabelleneigenschaft pipelines.reset.allowed auf false. Siehe Pipeline-Eigenschaften-Referenz.
Schrittweise Aktualisierung
REPLACE-Flüsse WHERE verwenden nach Möglichkeit eine inkrementelle Aktualisierung und verarbeiten nur die Quelldaten, die seit der letzten Aktualisierung geändert wurden, anstatt das gesamte Ersetzungsfenster neu zu komputieren. Für die inkrementelle Aktualisierung ist eine serverlose Berechnung erforderlich.
Wann die inkrementelle Aktualisierung gilt
Alle folgenden Punkte müssen wahr sein:
- Die Pipeline läuft auf serverloser Recheninfrastruktur.
- Die Abfragestruktur wird unterstützt. Siehe inkrementelle Aktualisierung für den unterstützten Operatorsatz.
- Das Prädikat verweist auf Basisspalten aus einer Quelltabelle. Prädikate für abgeleitete Werte, z. B. Aggregat- oder Fensterfunktionsausgaben, können nicht an eine Quelle übertragen werden, wodurch die inkrementelle Aktualisierung deaktiviert wird.
- Keine Zeilen im aktuellen Ersetzungsfenster wurden durch externe DML geändert. DML, die Zeilen außerhalb des aktuellen Fensters ändert, ist nicht betroffen.
- Das aktuelle Ersetzungsfenster enthält keine Zeilen, die das vorherige Prädikat ausgeschlossen hat. Wenn Sie das Prädikat so erweitern, dass es einen Bereich abdeckt, der zuvor nicht verarbeitet wurde, wird diese eine Aktualisierung als vollständige Neuberechnung ausgeführt. Nachfolgende Aktualisierungen sind wieder für die inkrementelle Aktualisierung berechtigt.
- Das Prädikat ist deterministisch. Prädikate, die nichtdeterministische Funktionen wie
rand()verwenden, deaktivieren die inkrementelle Aktualisierung. Temporalfunktionen wiecurrent_date()sind zulässig.
Die erste Aktualisierung eines Flusses ist immer eine vollständige Berechnung. Wenn eine Bedingung nicht erfüllt ist, greift diese Aktualisierung auf eine vollständige Neuberechnung des aktuellen Ersetzungsfensters zurück.
Bewährte Methoden für die inkrementelle Aktualisierung
Befolgen Sie diese Richtlinien, damit REPLACE WHERE-Flows weiterhin für die inkrementelle Aktualisierung geeignet sind.
Verwenden Sie eine bewegliche untere Grenze
Prädikate mit einer sich bewegenden unteren Grenze bleiben für die inkrementelle Aktualisierung auf unbestimmte Zeit berechtigt.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Eine variable obere Grenze wie date BETWEEN date_add(current_date(), -7) AND current_date() kann dazu führen, dass sich das Fenster verschiebt und zuvor ausgeschlossene Zeilen einschließt, wodurch ein einmaliges Zurückfallen auf eine vollständige Neuberechnung ausgelöst wird.
Prädikatspalte in GROUP BY einbeziehen
Schließen Sie beim Aggregieren die Prädikatspalte ein GROUP BY , damit das Modul das Prädikat unterhalb der Aggregation pushen kann.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Wenn die Spalte für das Prädikat in GROUP BY fehlt, kann das Prädikat nicht unterhalb der Aggregation verschoben werden, und die Datenquelle wird vollständig gescannt.
Einschließen der Prädikatspalte in Verknüpfungsschlüssel
Fügen Sie die Prädikatspalte in die Verknüpfungsbedingung ein, damit das Modul alle verknüpften Quellen löschen kann.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Wenn eine verknüpfte Tabelle die Prädikatspalte nicht verfügbar macht, wird diese Tabelle bei jeder Aktualisierung vollständig gescannt.
Fallback auf vollständige Neuberechnung diagnostizieren
Wenn eine Aktualisierung auf eine vollständige Neuberechnung zurückfällt, wird der Grund im planning_information-Ereignis des Flows gemeldet. Siehe Pipelineereignisprotokolle überwachen. In der folgenden Tabelle sind die im Ereignis gemeldeten Gründe aufgeführt:
| Grund | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Eine externe DML hat Zeilen im aktuellen Ersetzungsfenster geändert. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Das Prädikat verwendet nicht-deterministische Ausdrücke. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
Die vorherige Aktualisierung hat ein nicht deterministisches Prädikat verwendet. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Das Prädikat kann an keine Quelle weitergegeben werden, das aktuelle Fenster enthält Zeilen, die nicht vom vorherigen Prädikat verarbeitet wurden, oder die Ausführung verwendet eine Überschreibung des Prädikats. |
Einschränkungen
REPLACE WHERE-Flows weisen die folgenden Einschränkungen auf:
- Die Zieltabelle muss innerhalb der Pipeline erstellt werden.
- Pro Zieltabelle ist nur ein REPLACE WHERE Fluss zulässig.
- Eine Tabelle, die Ziel eines REPLACE-WHERE-Flow ist, kann nicht gleichzeitig auch Ziel eines anderen Flow-Typs sein, z. B. eines AUTO-CDC-Flow oder eines Append-Flow.
- Expectations werden für Tabellen, auf die sich REPLACE-WHERE-Flows beziehen, nicht unterstützt.
- Informationen zu Streamingtabellen, die in Databricks SQL erstellt wurden, finden Sie unter REPLACE-Flows für eigenständige StreamingtabellenWHERE, wenn Sie Informationen zu Unterschieden bei Syntax und Backfill suchen.
Beispiele
Die folgenden Beispiele zeigen allgemeine REPLACE-Flussmuster WHERE .
Beispiel 1: Historische Aggregate aus einer Quelle mit begrenzter Aufbewahrungsdauer beibehalten
Dieses Beispiel behält tägliche Aggregate unbegrenzt bei, selbst nachdem Rohdaten aufgrund der 3-tägigen Aufbewahrung aus der Quelltabelle herausfallen:
SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
Beispiel 2: Verhindern der Neukompilierung, wenn sich eine Dimensionstabelle ändert
In diesem Beispiel bleiben historische Faktenzeilen unverändert, wenn sich Dimensionsattribute ändern:
SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
Wenn sich der Bereich eines Benutzers ändert, werden nur die zuletzt verwendeten Zeilen neu komputiert. Historische Einträge behalten den Regionswert zum Zeitpunkt bei, zu dem sie geschrieben wurden. Um historische Zeilen zu korrigieren, führen Sie ein gezieltes Rückfüllen mithilfe von Prädikatüberschreibungen aus.
Beispiel 3: Hinzufügen einer neuen Metrik, ohne den vollständigen Verlauf neu zu komputieren
In diesem Beispiel wird gezeigt, wie Sie eine Tabellendefinition entwickeln und nur einen gezielten Bereich ausfüllen:
Definieren Sie die anfängliche Tabelle:
SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Python
from pyspark import pipelines as dp from pyspark.sql import functions as F from pyspark.sql.functions import col @dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg(F.count("*").alias("clicks")) )Aktualisieren Sie die Abfrage, um
uniq_usershinzuzufügen:SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Python
@dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg( F.count("*").alias("clicks"), F.countDistinct("user_id").alias("uniq_users"), ) )Füllen Sie die neue Metrik für die letzten 30 Tage zurück:
overrides = [ { "flow_name": "clickstream_daily", "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'", } ] resp = start_update_with_replace_where( pipeline_id="<pipeline-id>", replace_where_overrides=overrides, refresh_selection=["clickstream_daily"], )Zeilen, die älter als der rückwirkend aufgefüllte Bereich sind, enthalten
NULLfüruniq_users.
Beispiel 4: Ein kleines Zeitfenster durchlaufen, bevor der vollständige Verlauf nachgetragen wird
In diesem Beispiel wird gezeigt, wie Sie die Abfragelogik in einem kleinen Datenfenster überprüfen, bevor Sie den vollständigen Verlaufsbereich verarbeiten.
Beginnen Sie mit einem kurzen Fenster, sodass jede Aktualisierung nur die letzten 7 Tage neu komputet, während Sie die Abfrage überarbeiten:
SQL
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
Nachdem die Abfrage abgeschlossen wurde, verwenden Sie eine Prädikatüberschreibung, um ein einmaliges historisches Rückfüllen durchzuführen:
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)