Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ważna
Przepływy REPLACE WHERE znajdują się w wersji beta.
Na tej stronie opisano, jak używać przepływów REPLACE WHERE w Lakeflow Spark Declarative Pipelines do ponownego obliczania i nadpisywania określonego podzbioru tabeli bez ponownego przetwarzania całej historii tej tabeli. Przepływy REPLACE WHERE obsługują dane, które docierają z opóźnieniem, ponowne przetwarzanie danych wejściowych, ewolucję schematu i uzupełnianie danych historycznych.
Za pomocą przepływu REPLACE WHERE zdefiniujesz predykat w tabeli docelowej. Wszystkie wiersze pasujące do predykatu są usuwane i zastępowane przez ponowne obliczanie zapytania źródłowego dla tego samego zakresu predykatu. Wiersze, które nie pasują do predykatu, pozostają niezmienione.
Requirements
Przepływy REPLACE WHERE mają następujące wymagania:
- Twój potok przetwarzania musi korzystać z kanału
PREVIEW. - Databricks zaleca Unity Catalog i obliczenia bezserwerowe. Odświeżanie przyrostowe jest obsługiwane tylko w przypadku obliczeń bezserwerowych.
Kiedy należy używać przepływów REPLACE WHERE
Użyj przepływów REPLACE WHERE w następujących scenariuszach:
- Przyrostowe przetwarzanie wsadowe bez semantyki przesyłania strumieniowego: przetwarzaj nowe wiersze w partiach bez zarządzania pojęciami przesyłania strumieniowego, takimi jak znaki wodne.
- Ponowne przetwarzanie selektywne: Ponowne obliczanie tylko wierszy, które pasują do predykatu, pozostawiając wszystkie inne wiersze nietknięte.
-
Scenariusze wykraczające poza standardowe możliwości widoku zmaterializowanego:
- Tabele docelowe z dłuższym okresem przechowywania niż źródło
- Zapobieganie ponownej kompilacji w przypadku zmiany tabeli wymiarów
- Ewolucja schematu bez ponownego skompilowania całej historii
Utwórz przepływ REPLACE WHERE
Zdefiniuj przepływy REPLACE WHERE w języku SQL lub Python.
SQL
Użyj klauzuli FLOW REPLACE WHERE w tekście wraz z CREATE STREAMING TABLE:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Alternatywnie należy użyć składni długiej:CREATE FLOW
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Python
W Python tabela i przepływ są definiowane w jednej instrukcji. Przepływ dziedziczy taką samą nazwę jak tabela:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
Parametr replace_where akceptuje wyrażenie kolumny PySpark lub predykat ciągu.
W tych przykładach wszystkie wiersze z ostatnich 7 dni są usuwane z orders_enriched i ponownie obliczane przy użyciu zapytania źródłowego. Nie musisz dodawać predykatu do zapytania źródłowego. Silnik potoku automatycznie stosuje ją podczas odczytywania ze źródła.
Note
BY NAME jest wymagany w języku SQL. Dopasowuje kolumny według nazw, a nie pozycji.
Wypełnianie danych historycznych
Aby zapisać historyczne lub poprawione wiersze do tabeli docelowej poza zaplanowanym odświeżaniem, wybierz między dwoma mechanizmami w zależności od tego, gdzie istnieją dane historyczne:
- Nadpisania predykatu: Uruchom ponownie zapytanie źródłowe przepływu dla jednorazowego zakresu wartości predykatu. Użyj, gdy dane historyczne pochodzą z tego samego źródła co dane przyrostowe.
- Instrukcje DML: wstaw bezpośrednio do tabeli docelowej, pomijając przepływ. Użyj, gdy dane historyczne znajdują się w innym źródle danych niż dane przyrostowe.
Przesłonięcia predykatu
Nadpisz predykat REPLACE WHERE na potrzeby pojedynczej aktualizacji potoku bez modyfikowania definicji potoku. Przesłonięcia predykatu są jednorazowe, mają zastosowanie tylko do bieżącej aktualizacji i nie mają wpływu na przyszłe uruchomienia.
Przykład: początkowe obciążenie historyczne
Aby wykonać jednorazowe wypełnianie danych historycznych podczas pierwszego konfigurowania potoku:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
Przykład: poprawianie kolumny dla określonego okresu
Po zaktualizowaniu definicji kolumny uzupełnij zmiany dla docelowego zakresu historycznego:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
Połącz wiele wymiarów w jednym zastąpieniu predykatu:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
Funkcja pomocnika: start_update_with_replace_where
Użyj interfejsu API aktualizacji potoku z poziomu notesu, aby przesłać nadpisania predykatów:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
Instrukcje DML
Wykonaj instrukcje DML bezpośrednio na tabeli docelowej poza potokiem, aby przeprowadzić wstępne ładowanie danych lub wprowadzić poprawki, na przykład ładując dane z tabeli starszego systemu:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Wiersze wstawione za pomocą języka DML nie podlegają predykatowi REPLACE WHERE i są zachowywane podczas zaplanowanych odświeżeń, chyba że znajdą się w zakresie predykatu w przyszłym uruchomieniu.
Działanie pełnego odświeżania
Pełne odświeżenie przepływu REPLACE WHERE ponownie wykonuje zapytanie źródłowe przy użyciu tylko bieżącego predykatu. Wiersze wstawione przez przesłonięcia predykatu lub instrukcje DML poza bieżącym zakresem predykatu są trwale usuwane.
Ostrzeżenie
Pełne odświeżanie czyści wszystkie istniejące dane i ponownie wykonuje przepływ przy użyciu tylko zdefiniowanego predykatu. Jeśli potok działa przez rok z 7-dniowym predykatem, pełne odświeżanie powoduje wyświetlenie tabeli zawierającej tylko dane z ostatnich 7 dni. Wszystkie starsze wiersze są trwale usuwane.
Aby zapobiec pełnym odświeżeniom tabeli, ustaw właściwość tabeli pipelines.reset.allowed na wartość false. Zobacz Dokumentacja właściwości potoku.
Odświeżanie przyrostowe
Przepływy REPLACE WHERE używają odświeżania przyrostowego, gdy jest to możliwe, ponownie przetwarzając tylko te dane źródłowe, które uległy zmianie od ostatniego odświeżenia, zamiast ponownie przeliczać całe okno zastępowania. Odświeżanie przyrostowe wymaga przetwarzania bezserwerowego.
Kiedy odświeżanie przyrostowe ma zastosowanie
Wszystkie poniższe warunki muszą być spełnione:
- Potok przetwarzania działa w środowisku bezserwerowym.
- Kształt zapytania jest obsługiwany. Zobacz Odświeżanie przyrostowe, aby uzyskać informacje o obsługiwanym zestawie operatorów.
- Predykat odwołuje się do kolumn podstawowych z tabeli źródłowej. Predykaty dla wartości pochodnych, takich jak dane wyjściowe funkcji agregacji lub okna, nie mogą być wypychane do źródła, co wyłącza odświeżanie przyrostowe.
- Żaden zewnętrzny kod DML nie zmodyfikował wierszy w bieżącym oknie zamiany. Instrukcje DML modyfikujące wiersze poza bieżącym oknem nie są tym objęte.
- Bieżące okno zamiany nie zawiera wierszy, które poprzedni predykat wykluczył. Jeśli poszerzysz predykat tak, aby objął zakres, który nie był wcześniej przetwarzany, to odświeżenie będzie wymagało pełnego przeliczenia. Kolejne odświeżenia ponownie kwalifikują się do odświeżania przyrostowego.
- Predykat jest deterministyczny. Predykaty używające funkcji niedeterministycznych, takich jak
rand(), wyłączają odświeżanie przyrostowe. Funkcje czasowe, takie jakcurrent_date()są dozwolone.
Pierwsze odświeżenie każdego przepływu jest zawsze pełnym obliczeniem. Jeśli którykolwiek warunek nie zostanie spełniony, następuje powrót do pełnego ponownego obliczenia bieżącego okna zastępowania.
Najlepsze rozwiązania dotyczące odświeżania przyrostowego
Postępuj zgodnie z tymi wytycznymi, aby przepływy REPLACE WHERE nadal kwalifikowały się do odświeżania przyrostowego.
Użyj ruchomej dolnej granicy
Predykaty z ruchomą dolną granicą kwalifikują się do odświeżania przyrostowego bezterminowo.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Przesuwana górna granica, taka jak date BETWEEN date_add(current_date(), -7) AND current_date(), może przesunąć okno, aby uwzględnić wcześniej wykluczone wiersze, wyzwalając jednorazowy powrót do pełnej ponownej kompilacji.
Uwzględnij kolumnę predykatu w GROUP BY
Podczas agregacji uwzględnij kolumnę predykatu w GROUP BY, aby silnik mógł przesunąć predykat poniżej agregacji.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Jeśli w GROUP BY brakuje kolumny predykatu, nie można przenieść predykatu poniżej agregacji i źródło jest skanowane w całości.
Uwzględnij kolumnę predykatu w kluczach łączenia
Uwzględnij kolumnę predykatu w warunku łączenia, aby silnik mógł odrzucić wszystkie dołączone źródła.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Jeśli tabela sprzężona nie uwidacznia kolumny predykatu, ta tabela jest skanowana w całości podczas każdego odświeżania.
Diagnozowanie przejścia na pełne ponowne obliczanie
Gdy odświeżanie wraca do pełnego ponownego obliczenia, przyczyna jest raportowana w zdarzeniu planning_information dla przepływu. Zobacz Monitorowanie dzienników zdarzeń potoku. W poniższej tabeli wymieniono przyczyny zgłoszone w zdarzeniu:
| Reason | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Zewnętrzna operacja DML zmodyfikowała wiersze w bieżącym oknie zastępowania. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Predykat używa wyrażeń niedeterministycznych. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
Poprzednie odświeżanie używało predykatu niedeterministycznego. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Predykatu nie można przesłać do żadnego źródła, bieżące okno zawiera wiersze, które nie zostały przetworzone przez poprzedni predykat, lub w ramach uruchomienia zastosowano zastąpienie predykatu. |
Ograniczenia
Przepływy REPLACE WHERE mają następujące ograniczenia:
- Tabela docelowa musi zostać utworzona w potoku.
- Tylko jeden przepływ REPLACE WHERE jest dozwolony dla tabeli docelowej.
- Tabela, która jest celem przepływu REPLACE WHERE, nie może być także celem innego typu przepływu, takiego jak przepływ AUTO CDC lub przepływ dopisywania.
- Oczekiwania nie są obsługiwane w tabelach objętych przepływami REPLACE WHERE .
- Informacje o tabelach strumieniowych utworzonych w Databricks SQL można znaleźć tutaj: REPLACE WHERE flows for standalone streaming tables; opisano tam składnię i różnice dotyczące uzupełniania danych historycznych.
Examples
W poniższych przykładach przedstawiono typowe wzorce przepływu REPLACE WHERE .
Przykład 1. Zachowywanie agregacji historycznych ze źródła ograniczonego przechowywania
W tym przykładzie agregaty dzienne są przechowywane bezterminowo, nawet po wygaśnięciu surowych danych w tabeli źródłowej (3-dniowy okres przechowywania):
SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
Przykład 2. Zapobieganie ponownej kompilacji w przypadku zmiany tabeli wymiarów
Ten przykład pozostawia historyczne wiersze w tabeli faktów bez zmian, gdy zmieniają się atrybuty wymiaru:
SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
Jeśli region użytkownika ulegnie zmianie, tylko ostatnie wiersze zostaną ponownie skompilowane. Wiersze historyczne zachowują wartość regionu z momentu, w którym zostały zapisane. Aby skorygować historyczne wiersze, uruchom ukierunkowane uzupełnienie danych przy użyciu nadpisań predykatu.
Przykład 3. Dodawanie nowej metryki bez ponownego skompilowania pełnej historii
W tym przykładzie pokazano, jak rozwijać definicję tabeli i wypełniać tylko zakres docelowy:
Zdefiniuj początkową tabelę:
SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Python
from pyspark import pipelines as dp from pyspark.sql import functions as F from pyspark.sql.functions import col @dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg(F.count("*").alias("clicks")) )Zaktualizuj zapytanie, aby dodać
uniq_users:SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Python
@dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg( F.count("*").alias("clicks"), F.countDistinct("user_id").alias("uniq_users"), ) )Uzupełnij nową metrykę za ostatnie 30 dni:
overrides = [ { "flow_name": "clickstream_daily", "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'", } ] resp = start_update_with_replace_where( pipeline_id="<pipeline-id>", replace_where_overrides=overrides, refresh_selection=["clickstream_daily"], )Wiersze starsze niż zakres uzupełniony wstecz zawierają
NULLdlauniq_users.
Przykład 4: Iteracja w małym oknie przed uzupełnieniem pełnej historii
W tym przykładzie pokazano, jak zweryfikować logikę zapytań w małym oknie danych przed przetworzeniem pełnego zakresu historycznego.
Rozpocznij od krótkiego okna, aby podczas modyfikowania zapytania każde odświeżenie przeliczało tylko ostatnie 7 dni:
SQL
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
Po sfinalizowaniu zapytania użyj nadpisania predykatu, aby jednorazowo uzupełnić dane historyczne:
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)