Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ważna
Przepływy REPLACE WHERE dla autonomicznych tabel przesyłania strumieniowego znajdują się w wersji beta.
Na tej stronie opisano sposób użycia przepływów REPLACE WHERE w celu ponownego skompilowania i zastąpienia docelowego podzestawu autonomicznej tabeli przesyłania strumieniowego bez ponownego przetwarzania całej historii tabeli. Przepływy REPLACE WHERE obsługują dane, które docierają z opóźnieniem, ponowne przetwarzanie danych wejściowych, ewolucję schematu i uzupełnianie danych historycznych.
Za pomocą przepływu REPLACE WHERE zdefiniujesz predykat w tabeli docelowej. Wszystkie wiersze pasujące do predykatu są usuwane i zastępowane przez ponowne obliczanie zapytania źródłowego dla tego samego zakresu predykatu. Wiersze, które nie spełniają predykatu, pozostają bez zmian.
Wymagania
Przepływy REPLACE WHERE mają następujące wymagania:
- Tabela przesyłania strumieniowego musi używać kanału
PREVIEW. Zobaczchannelw temacie Konfiguracje potoków. - Databricks zaleca Unity Catalog i obliczenia bezserwerowe. Odświeżanie przyrostowe jest obsługiwane tylko w przypadku obliczeń bezserwerowych.
Kiedy należy używać przepływów REPLACE WHERE
Użyj przepływów REPLACE WHERE w następujących scenariuszach:
- Przyrostowe przetwarzanie wsadowe bez semantyki przesyłania strumieniowego: przetwarzaj nowe wiersze w partiach bez zarządzania pojęciami przesyłania strumieniowego, takimi jak znaki wodne.
- Ponowne przetwarzanie selektywne: Ponowne obliczanie tylko wierszy, które pasują do predykatu, pozostawiając wszystkie inne wiersze nietknięte.
-
Scenariusze wykraczające poza standardowe możliwości widoku zmaterializowanego:
- Tabele docelowe z dłuższym okresem przechowywania niż źródło
- Zapobieganie ponownej kompilacji w przypadku zmiany tabeli wymiarów
- Ewolucja schematu bez ponownego skompilowania całej historii
Utwórz przepływ REPLACE WHERE
Użyj klauzuli FLOW REPLACE WHERE w tekście wraz z CREATE OR REFRESH STREAMING TABLE:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Podczas odświeżania wszystkie wiersze w tabeli docelowej, które pasują do predykatu, są usuwane, zapytanie źródłowe zostanie ponownie skompilowane dla tego samego zakresu predykatu, a nowe wyniki zostaną wstawione. W tym przykładzie wszystkie wiersze z ostatnich 7 dni zostaną usunięte z orders_enriched i ponownie obliczone przy użyciu zapytania źródłowego.
Nie musisz dodawać predykatu do zapytania źródłowego. Silnik potoku automatycznie stosuje ją podczas odczytywania ze źródła.
Note
Ciąg BY NAME jest wymagany. Gwarantuje, że kolumny są dopasowywane według nazwy, a nie pozycji.
Wypełnianie danych historycznych
Aby przeprowadzić uzupełnianie danych, uruchom instrukcje DML bezpośrednio na tabeli docelowej:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Działanie pełnego odświeżania
Pełne odświeżenie przepływu REPLACE WHERE ponownie wykonuje zapytanie źródłowe przy użyciu tylko bieżącego predykatu. Wiersze wstawione przez instrukcje DML poza bieżącym zakresem predykatu zostają trwale usunięte.
Ostrzeżenie
Pełne odświeżanie czyści wszystkie istniejące dane i ponownie wykonuje przepływ przy użyciu tylko zdefiniowanego predykatu. Jeśli potok działa przez rok z 7-dniowym predykatem, pełne odświeżanie powoduje wyświetlenie tabeli zawierającej tylko dane z ostatnich 7 dni. Wszystkie starsze wiersze są trwale usuwane.
REFRESH STREAMING TABLE orders_enriched FULL;
Aby zapobiec pełnym odświeżeniu w tabeli, ustaw właściwość pipelines.reset.allowed tabeli na :false
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.reset.allowed = 'false')
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
...
Odświeżanie przyrostowe
Przepływy REPLACE WHERE używają odświeżania przyrostowego, gdy jest to możliwe, ponownie przetwarzając tylko te dane źródłowe, które uległy zmianie od ostatniego odświeżenia, zamiast ponownie przeliczać całe okno zastępowania. Odświeżanie przyrostowe wymaga przetwarzania bezserwerowego.
Kiedy odświeżanie przyrostowe ma zastosowanie
Wszystkie poniższe warunki muszą być spełnione:
- Potok przetwarzania działa w środowisku bezserwerowym.
- Kształt zapytania jest obsługiwany. Zobacz Odświeżanie przyrostowe, aby uzyskać informacje o obsługiwanym zestawie operatorów.
- Predykat odwołuje się do kolumn podstawowych z tabeli źródłowej. Predykaty dla wartości pochodnych, takich jak dane wyjściowe funkcji agregacji lub okna, nie mogą być wypychane do źródła, co wyłącza odświeżanie przyrostowe.
- Żaden zewnętrzny kod DML nie zmodyfikował wierszy w bieżącym oknie zamiany. Instrukcje DML modyfikujące wiersze poza bieżącym oknem nie są tym objęte.
- Bieżące okno zamiany nie zawiera wierszy, które poprzedni predykat wykluczył. Jeśli poszerzysz predykat tak, aby objął zakres, który nie był wcześniej przetwarzany, to odświeżenie będzie wymagało pełnego przeliczenia. Kolejne odświeżenia ponownie kwalifikują się do odświeżania przyrostowego.
- Predykat jest deterministyczny. Predykaty używające funkcji niedeterministycznych, takich jak
rand(), wyłączają odświeżanie przyrostowe. Funkcje czasowe, takie jakcurrent_date()są dozwolone.
Pierwsze odświeżenie każdego przepływu jest zawsze pełnym obliczeniem. Jeśli którykolwiek warunek nie zostanie spełniony, następuje powrót do pełnego ponownego obliczenia bieżącego okna zastępowania.
Najlepsze rozwiązania dotyczące odświeżania przyrostowego
Postępuj zgodnie z tymi wytycznymi, aby przepływy REPLACE WHERE nadal kwalifikowały się do odświeżania przyrostowego.
Użyj ruchomej dolnej granicy
Predykaty z ruchomą dolną granicą kwalifikują się do odświeżania przyrostowego bezterminowo.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Przesuwana górna granica, taka jak date BETWEEN date_add(current_date(), -7) AND current_date(), może przesunąć okno, aby uwzględnić wcześniej wykluczone wiersze, wyzwalając jednorazowy powrót do pełnej ponownej kompilacji.
Uwzględnij kolumnę predykatu w GROUP BY
Podczas agregacji uwzględnij kolumnę predykatu w GROUP BY, aby silnik mógł przesunąć predykat poniżej agregacji.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Jeśli w GROUP BY brakuje kolumny predykatu, nie można przenieść predykatu poniżej agregacji i źródło jest skanowane w całości.
Uwzględnij kolumnę predykatu w kluczach łączenia
Uwzględnij kolumnę predykatu w warunku łączenia, aby silnik mógł odrzucić wszystkie dołączone źródła.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Jeśli tabela sprzężona nie uwidacznia kolumny predykatu, ta tabela jest skanowana w całości podczas każdego odświeżania.
Diagnozowanie przejścia na pełne ponowne obliczanie
Gdy odświeżanie wraca do pełnego ponownego obliczenia, przyczyna jest raportowana w zdarzeniu planning_information dla przepływu. Zobacz Monitorowanie dzienników zdarzeń potoku. W poniższej tabeli wymieniono przyczyny zgłoszone w zdarzeniu:
| Reason | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Zewnętrzna operacja DML zmodyfikowała wiersze w bieżącym oknie zastępowania. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Predykat używa wyrażeń niedeterministycznych. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
Poprzednie odświeżanie używało predykatu niedeterministycznego. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Predykatu nie można przesłać do żadnego źródła, bieżące okno zawiera wiersze, które nie zostały przetworzone przez poprzedni predykat, lub w ramach uruchomienia zastosowano zastąpienie predykatu. |
Examples
W poniższych przykładach przedstawiono typowe wzorce przepływu REPLACE WHERE .
Przykład 1. Zachowywanie agregacji historycznych ze źródła ograniczonego przechowywania
W tym przykładzie agregaty dzienne są przechowywane bezterminowo, nawet po wygaśnięciu surowych danych w tabeli źródłowej (3-dniowy okres przechowywania):
CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Przykład 2. Zapobieganie ponownej kompilacji w przypadku zmiany tabeli wymiarów
Ten przykład pozostawia historyczne wiersze w tabeli faktów bez zmian, gdy zmieniają się atrybuty wymiaru:
CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Jeśli region użytkownika ulegnie zmianie, tylko ostatnie wiersze zostaną ponownie skompilowane. Wiersze historyczne zachowują wartość regionu z momentu, w którym zostały zapisane.
Przykład 3. Dodawanie nowej metryki bez ponownego skompilowania pełnej historii
W tym przykładzie pokazano, jak rozwijać definicję tabeli i wypełniać tylko zakres docelowy:
Zdefiniuj początkową tabelę:
CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Zaktualizuj zapytanie, aby dodać
uniq_users:CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Wiersze starsze niż 7-dniowe okno zawierają
NULLdlauniq_users.
Przykład 4: Iteracja w małym oknie przed uzupełnieniem pełnej historii
W tym przykładzie pokazano, jak zweryfikować logikę zapytań w małym oknie danych przed przetworzeniem pełnego zakresu historycznego.
Zacznij od krótkiego okna, aby zweryfikować metryki i iterować logikę biznesową przy niższych kosztach obliczeń:
CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Krótki zakres czasowy przelicza ponownie tylko ostatnie 7 dni przy każdym odświeżeniu, więc przed uruchomieniem pełnego przeliczenia danych historycznych modyfikuj zapytanie tyle razy, ile potrzeba.
Po sfinalizowaniu zapytania użyj języka DML, aby wypełnić pełny zakres historyczny:
INSERT INTO revenue_attribution
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;