Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Important
REPLACE-Abläufe WHERE für eigenständige Streaming-Tabellen sind in Beta.
Auf dieser Seite wird beschrieben, wie REPLACE WHERE Flows verwendet werden, um eine bestimmte Teilmenge einer eigenständigen Streaming-Tabelle neu zu berechnen und zu überschreiben, ohne den gesamten Tabellenverlauf neu zu verarbeiten. REPLACE-Abläufe WHERE verarbeiten spät ankommende Daten, vorgelagerte Neuverarbeitung, Schemaentwicklung und Rückfüllungen.
In einem REPLACE-Flow WHERE definieren Sie ein Prädikat für die Zieltabelle. Alle Zeilen, die dem Prädikat entsprechen, werden gelöscht und durch erneute Auswertung der Quellabfrage für denselben Prädikatbereich ersetzt. Zeilen, die nicht mit dem Prädikat übereinstimmen, bleiben unberührt.
Requirements
REPLACE-Flüsse WHERE weisen die folgenden Anforderungen auf:
- Ihre Streamingtabelle muss den
PREVIEWKanal verwenden. Siehechannelin Pipelinekonfigurationen. - Databricks empfiehlt Unity-Katalog und serverlose Berechnung. Die inkrementelle Aktualisierung wird nur auf serverloser Berechnung unterstützt.
Wann REPLACE-WHEREFlows verwendet werden sollten
Verwenden Sie REPLACE WHEREAbläufe für die folgenden Szenarien:
- Inkrementelle Batchverarbeitung ohne Streamingsemantik: Verarbeiten neuer Zeilen in Batches ohne Verwaltung von Streamingkonzepten wie Wasserzeichen.
- Selektive Neuverarbeitung: Kompieren Sie nur Zeilen, die einem Prädikat entsprechen, während alle anderen Zeilen unverändert bleiben.
-
Szenarien, die über standardmäßige materialisierte Ansichtsfunktionen hinausgehen:
- Zieltabellen mit längerer Aufbewahrung als die Quelle
- Verhindern der Neukompilierung, wenn sich eine Dimensionstabelle ändert
- Schema-Evolution ohne Neuberechnung der gesamten Historie
Erstellen eines REPLACE-Flusses WHERE
Verwenden Sie die Klausel FLOW REPLACE WHERE inline mit CREATE OR REFRESH STREAMING TABLE:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Während der Aktualisierung werden alle Zeilen in der Zieltabelle, die mit dem Prädikat übereinstimmen, gelöscht, die Quellabfrage für denselben Prädikatbereich neu komputiert, und die neuen Ergebnisse werden eingefügt. In diesem Beispiel werden alle Zeilen der letzten 7 Tage aus orders_enriched gelöscht und mithilfe der Quellabfrage neu berechnet.
Sie müssen das Prädikat nicht zur Quellabfrage hinzufügen. Die Pipeline-Engine wendet es automatisch an, wenn Daten aus der Quelle gelesen werden.
Note
BY NAME ist erforderlich. Es stellt sicher, dass Spalten nach Namen und nicht nach Position abgeglichen werden.
Zurückfüllen von historischen Daten
Führen Sie ZUM Ausführen von Rückfüllungen DML-Anweisungen direkt in der Zieltabelle aus:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Vollständiges Aktualisierungsverhalten
Eine vollständige Aktualisierung eines REPLACE WHERE-Flows führt die Quellabfrage unter ausschließlicher Verwendung des aktuellen Prädikats erneut aus. Zeilen, die von DML-Anweisungen außerhalb des aktuellen Prädikatbereichs eingefügt wurden, werden endgültig gelöscht.
Warning
Eine vollständige Aktualisierung löscht alle vorhandenen Daten und führt den Fluss nur mithilfe des definierten Prädikats erneut aus. Wenn eine Pipeline ein Jahr lang mit einem 7-Tage-Prädikat ausgeführt wurde, führt eine vollständige Aktualisierung dazu, dass die Tabelle nur die Daten der letzten 7 Tage enthält. Alle älteren Zeilen werden endgültig gelöscht.
REFRESH STREAMING TABLE orders_enriched FULL;
Um vollständige Aktualisierungen einer Tabelle zu verhindern, legen Sie die Tabelleneigenschaft pipelines.reset.allowed auf :false
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.reset.allowed = 'false')
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
...
Schrittweise Aktualisierung
REPLACE-Flüsse WHERE verwenden nach Möglichkeit eine inkrementelle Aktualisierung und verarbeiten nur die Quelldaten, die seit der letzten Aktualisierung geändert wurden, anstatt das gesamte Ersetzungsfenster neu zu komputieren. Für die inkrementelle Aktualisierung ist eine serverlose Berechnung erforderlich.
Wann die inkrementelle Aktualisierung gilt
Alle folgenden Punkte müssen wahr sein:
- Die Pipeline läuft auf serverloser Recheninfrastruktur.
- Die Abfragestruktur wird unterstützt. Siehe inkrementelle Aktualisierung für den unterstützten Operatorsatz.
- Das Prädikat verweist auf Basisspalten aus einer Quelltabelle. Prädikate für abgeleitete Werte, z. B. Aggregat- oder Fensterfunktionsausgaben, können nicht an eine Quelle übertragen werden, wodurch die inkrementelle Aktualisierung deaktiviert wird.
- Keine Zeilen im aktuellen Ersetzungsfenster wurden durch externe DML geändert. DML, die Zeilen außerhalb des aktuellen Fensters ändert, ist nicht betroffen.
- Das aktuelle Ersetzungsfenster enthält keine Zeilen, die das vorherige Prädikat ausgeschlossen hat. Wenn Sie das Prädikat so erweitern, dass es einen Bereich abdeckt, der zuvor nicht verarbeitet wurde, wird diese eine Aktualisierung als vollständige Neuberechnung ausgeführt. Nachfolgende Aktualisierungen sind wieder für die inkrementelle Aktualisierung berechtigt.
- Das Prädikat ist deterministisch. Prädikate, die nichtdeterministische Funktionen wie
rand()verwenden, deaktivieren die inkrementelle Aktualisierung. Temporalfunktionen wiecurrent_date()sind zulässig.
Die erste Aktualisierung eines Flusses ist immer eine vollständige Berechnung. Wenn eine Bedingung nicht erfüllt ist, greift diese Aktualisierung auf eine vollständige Neuberechnung des aktuellen Ersetzungsfensters zurück.
Bewährte Methoden für die inkrementelle Aktualisierung
Befolgen Sie diese Richtlinien, damit REPLACE WHERE-Flows weiterhin für die inkrementelle Aktualisierung geeignet sind.
Verwenden Sie eine bewegliche untere Grenze
Prädikate mit einer sich bewegenden unteren Grenze bleiben für die inkrementelle Aktualisierung auf unbestimmte Zeit berechtigt.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Eine variable obere Grenze wie date BETWEEN date_add(current_date(), -7) AND current_date() kann dazu führen, dass sich das Fenster verschiebt und zuvor ausgeschlossene Zeilen einschließt, wodurch ein einmaliges Zurückfallen auf eine vollständige Neuberechnung ausgelöst wird.
Prädikatspalte in GROUP BY einbeziehen
Schließen Sie beim Aggregieren die Prädikatspalte ein GROUP BY , damit das Modul das Prädikat unterhalb der Aggregation pushen kann.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Wenn die Spalte für das Prädikat in GROUP BY fehlt, kann das Prädikat nicht unterhalb der Aggregation verschoben werden, und die Datenquelle wird vollständig gescannt.
Einschließen der Prädikatspalte in Verknüpfungsschlüssel
Fügen Sie die Prädikatspalte in die Verknüpfungsbedingung ein, damit das Modul alle verknüpften Quellen löschen kann.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Wenn eine verknüpfte Tabelle die Prädikatspalte nicht verfügbar macht, wird diese Tabelle bei jeder Aktualisierung vollständig gescannt.
Fallback auf vollständige Neuberechnung diagnostizieren
Wenn eine Aktualisierung auf eine vollständige Neuberechnung zurückfällt, wird der Grund im planning_information-Ereignis des Flows gemeldet. Siehe Pipelineereignisprotokolle überwachen. In der folgenden Tabelle sind die im Ereignis gemeldeten Gründe aufgeführt:
| Grund | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Eine externe DML hat Zeilen im aktuellen Ersetzungsfenster geändert. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Das Prädikat verwendet nicht-deterministische Ausdrücke. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
Die vorherige Aktualisierung hat ein nicht deterministisches Prädikat verwendet. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Das Prädikat kann an keine Quelle weitergegeben werden, das aktuelle Fenster enthält Zeilen, die nicht vom vorherigen Prädikat verarbeitet wurden, oder die Ausführung verwendet eine Überschreibung des Prädikats. |
Beispiele
Die folgenden Beispiele zeigen allgemeine REPLACE-Flussmuster WHERE .
Beispiel 1: Historische Aggregate aus einer Quelle mit begrenzter Aufbewahrungsdauer beibehalten
Dieses Beispiel behält tägliche Aggregate unbegrenzt bei, selbst nachdem Rohdaten aufgrund der 3-tägigen Aufbewahrung aus der Quelltabelle herausfallen:
CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Beispiel 2: Verhindern der Neukompilierung, wenn sich eine Dimensionstabelle ändert
In diesem Beispiel bleiben historische Faktenzeilen unverändert, wenn sich Dimensionsattribute ändern:
CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Wenn sich der Bereich eines Benutzers ändert, werden nur die zuletzt verwendeten Zeilen neu komputiert. Historische Einträge behalten den Regionswert zum Zeitpunkt bei, zu dem sie geschrieben wurden.
Beispiel 3: Hinzufügen einer neuen Metrik, ohne den vollständigen Verlauf neu zu komputieren
In diesem Beispiel wird gezeigt, wie Sie eine Tabellendefinition entwickeln und nur einen gezielten Bereich ausfüllen:
Definieren Sie die anfängliche Tabelle:
CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Aktualisieren Sie die Abfrage, um
uniq_usershinzuzufügen:CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Zeilen, die älter als das 7-Tage-Fenster sind, enthalten
NULLfüruniq_users.
Beispiel 4: Ein kleines Zeitfenster durchlaufen, bevor der vollständige Verlauf nachgetragen wird
In diesem Beispiel wird gezeigt, wie Sie die Abfragelogik in einem kleinen Datenfenster überprüfen, bevor Sie den vollständigen Verlaufsbereich verarbeiten.
Beginnen Sie mit einem kurzen Zeitraum, um Metriken zu validieren und an der Geschäftslogik bei geringeren Rechenkosten zu iterieren:
CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Bei einem kurzen Zeitraum werden bei jeder Aktualisierung nur die letzten 7 Tage neu berechnet. Überarbeiten Sie die Abfrage also so oft wie nötig, bevor Sie einen vollständigen historischen Durchlauf starten.
Sobald die Abfrage finalisiert ist, verwenden Sie DML, um den gesamten historischen Datenbereich rückwirkend aufzufüllen:
INSERT INTO revenue_attribution
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;