Przetwarzanie wsadowe za pomocą przepływów REPLACE WHERE

Ważna

Przepływy REPLACE WHERE znajdują się w wersji beta.

Na tej stronie opisano, jak używać przepływów REPLACE WHERE w Lakeflow Spark Declarative Pipelines do ponownego obliczania i nadpisywania określonego podzbioru tabeli bez ponownego przetwarzania całej historii tej tabeli. Przepływy REPLACE WHERE obsługują dane, które docierają z opóźnieniem, ponowne przetwarzanie danych wejściowych, ewolucję schematu i uzupełnianie danych historycznych.

Za pomocą przepływu REPLACE WHERE zdefiniujesz predykat w tabeli docelowej. Wszystkie wiersze pasujące do predykatu są usuwane i zastępowane przez ponowne obliczanie zapytania źródłowego dla tego samego zakresu predykatu. Wiersze, które nie pasują do predykatu, pozostają niezmienione.

Requirements

Przepływy REPLACE WHERE mają następujące wymagania:

  • Twój potok przetwarzania musi korzystać z kanału PREVIEW.
  • Databricks zaleca Unity Catalog i obliczenia bezserwerowe. Odświeżanie przyrostowe jest obsługiwane tylko w przypadku obliczeń bezserwerowych.

Kiedy należy używać przepływów REPLACE WHERE

Użyj przepływów REPLACE WHERE w następujących scenariuszach:

  • Przyrostowe przetwarzanie wsadowe bez semantyki przesyłania strumieniowego: przetwarzaj nowe wiersze w partiach bez zarządzania pojęciami przesyłania strumieniowego, takimi jak znaki wodne.
  • Ponowne przetwarzanie selektywne: Ponowne obliczanie tylko wierszy, które pasują do predykatu, pozostawiając wszystkie inne wiersze nietknięte.
  • Scenariusze wykraczające poza standardowe możliwości widoku zmaterializowanego:
    • Tabele docelowe z dłuższym okresem przechowywania niż źródło
    • Zapobieganie ponownej kompilacji w przypadku zmiany tabeli wymiarów
    • Ewolucja schematu bez ponownego skompilowania całej historii

Utwórz przepływ REPLACE WHERE

Zdefiniuj przepływy REPLACE WHERE w języku SQL lub Python.

SQL

Użyj klauzuli FLOW REPLACE WHERE w tekście wraz z CREATE STREAMING TABLE:

CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Alternatywnie należy użyć składni długiej:CREATE FLOW

CREATE STREAMING TABLE orders_enriched;

CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Python

W Python tabela i przepływ są definiowane w jednej instrukcji. Przepływ dziedziczy taką samą nazwę jak tabela:

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
  orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
  product_dim = spark.read.table("product_dim")
  return orders_fct.join(product_dim, "product_id")

Parametr replace_where akceptuje wyrażenie kolumny PySpark lub predykat ciągu.

W tych przykładach wszystkie wiersze z ostatnich 7 dni są usuwane z orders_enriched i ponownie obliczane przy użyciu zapytania źródłowego. Nie musisz dodawać predykatu do zapytania źródłowego. Silnik potoku automatycznie stosuje ją podczas odczytywania ze źródła.

Note

BY NAME jest wymagany w języku SQL. Dopasowuje kolumny według nazw, a nie pozycji.

Wypełnianie danych historycznych

Aby zapisać historyczne lub poprawione wiersze do tabeli docelowej poza zaplanowanym odświeżaniem, wybierz między dwoma mechanizmami w zależności od tego, gdzie istnieją dane historyczne:

  • Nadpisania predykatu: Uruchom ponownie zapytanie źródłowe przepływu dla jednorazowego zakresu wartości predykatu. Użyj, gdy dane historyczne pochodzą z tego samego źródła co dane przyrostowe.
  • Instrukcje DML: wstaw bezpośrednio do tabeli docelowej, pomijając przepływ. Użyj, gdy dane historyczne znajdują się w innym źródle danych niż dane przyrostowe.

Przesłonięcia predykatu

Nadpisz predykat REPLACE WHERE na potrzeby pojedynczej aktualizacji potoku bez modyfikowania definicji potoku. Przesłonięcia predykatu są jednorazowe, mają zastosowanie tylko do bieżącej aktualizacji i nie mają wpływu na przyszłe uruchomienia.

Przykład: początkowe obciążenie historyczne

Aby wykonać jednorazowe wypełnianie danych historycznych podczas pierwszego konfigurowania potoku:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
)
print(resp)

Przykład: poprawianie kolumny dla określonego okresu

Po zaktualizowaniu definicji kolumny uzupełnij zmiany dla docelowego zakresu historycznego:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
  refresh_selection=["orders_enriched"],
)
print(resp)

Połącz wiele wymiarów w jednym zastąpieniu predykatu:

overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
  }
]
Funkcja pomocnika: start_update_with_replace_where

Użyj interfejsu API aktualizacji potoku z poziomu notesu, aby przesłać nadpisania predykatów:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse


def start_update_with_replace_where(
  pipeline_id: str,
  replace_where_overrides: list[dict],
  refresh_selection: list[str] = None,
) -> StartUpdateResponse:
  """Start a pipeline update with REPLACE WHERE predicate overrides."""
  client = WorkspaceClient()

  body = {
    "pipeline_id": pipeline_id,
    "cause": "JOB_TASK",
    "update_cause_details": {
      "job_details": {"performance_target": "PERFORMANCE"}
    },
    "replace_where_overrides": replace_where_overrides,
  }

  if refresh_selection:
    body["refresh_selection"] = refresh_selection

  res = client.api_client.do(
    "POST",
    f"/api/2.0/pipelines/{pipeline_id}/updates",
    body=body,
    headers={"Accept": "application/json", "Content-Type": "application/json"},
  )

  return StartUpdateResponse.from_dict(res)

Instrukcje DML

Wykonaj instrukcje DML bezpośrednio na tabeli docelowej poza potokiem, aby przeprowadzić wstępne ładowanie danych lub wprowadzić poprawki, na przykład ładując dane z tabeli starszego systemu:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

Wiersze wstawione za pomocą języka DML nie podlegają predykatowi REPLACE WHERE i są zachowywane podczas zaplanowanych odświeżeń, chyba że znajdą się w zakresie predykatu w przyszłym uruchomieniu.

Działanie pełnego odświeżania

Pełne odświeżenie przepływu REPLACE WHERE ponownie wykonuje zapytanie źródłowe przy użyciu tylko bieżącego predykatu. Wiersze wstawione przez przesłonięcia predykatu lub instrukcje DML poza bieżącym zakresem predykatu są trwale usuwane.

Ostrzeżenie

Pełne odświeżanie czyści wszystkie istniejące dane i ponownie wykonuje przepływ przy użyciu tylko zdefiniowanego predykatu. Jeśli potok działa przez rok z 7-dniowym predykatem, pełne odświeżanie powoduje wyświetlenie tabeli zawierającej tylko dane z ostatnich 7 dni. Wszystkie starsze wiersze są trwale usuwane.

Aby zapobiec pełnym odświeżeniom tabeli, ustaw właściwość tabeli pipelines.reset.allowed na wartość false. Zobacz Dokumentacja właściwości potoku.

Odświeżanie przyrostowe

Przepływy REPLACE WHERE używają odświeżania przyrostowego, gdy jest to możliwe, ponownie przetwarzając tylko te dane źródłowe, które uległy zmianie od ostatniego odświeżenia, zamiast ponownie przeliczać całe okno zastępowania. Odświeżanie przyrostowe wymaga przetwarzania bezserwerowego.

Kiedy odświeżanie przyrostowe ma zastosowanie

Wszystkie poniższe warunki muszą być spełnione:

  • Potok przetwarzania działa w środowisku bezserwerowym.
  • Kształt zapytania jest obsługiwany. Zobacz Odświeżanie przyrostowe, aby uzyskać informacje o obsługiwanym zestawie operatorów.
  • Predykat odwołuje się do kolumn podstawowych z tabeli źródłowej. Predykaty dla wartości pochodnych, takich jak dane wyjściowe funkcji agregacji lub okna, nie mogą być wypychane do źródła, co wyłącza odświeżanie przyrostowe.
  • Żaden zewnętrzny kod DML nie zmodyfikował wierszy w bieżącym oknie zamiany. Instrukcje DML modyfikujące wiersze poza bieżącym oknem nie są tym objęte.
  • Bieżące okno zamiany nie zawiera wierszy, które poprzedni predykat wykluczył. Jeśli poszerzysz predykat tak, aby objął zakres, który nie był wcześniej przetwarzany, to odświeżenie będzie wymagało pełnego przeliczenia. Kolejne odświeżenia ponownie kwalifikują się do odświeżania przyrostowego.
  • Predykat jest deterministyczny. Predykaty używające funkcji niedeterministycznych, takich jak rand(), wyłączają odświeżanie przyrostowe. Funkcje czasowe, takie jak current_date() są dozwolone.

Pierwsze odświeżenie każdego przepływu jest zawsze pełnym obliczeniem. Jeśli którykolwiek warunek nie zostanie spełniony, następuje powrót do pełnego ponownego obliczenia bieżącego okna zastępowania.

Najlepsze rozwiązania dotyczące odświeżania przyrostowego

Postępuj zgodnie z tymi wytycznymi, aby przepływy REPLACE WHERE nadal kwalifikowały się do odświeżania przyrostowego.

Użyj ruchomej dolnej granicy

Predykaty z ruchomą dolną granicą kwalifikują się do odświeżania przyrostowego bezterminowo.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

Przesuwana górna granica, taka jak date BETWEEN date_add(current_date(), -7) AND current_date(), może przesunąć okno, aby uwzględnić wcześniej wykluczone wiersze, wyzwalając jednorazowy powrót do pełnej ponownej kompilacji.

Uwzględnij kolumnę predykatu w GROUP BY

Podczas agregacji uwzględnij kolumnę predykatu w GROUP BY, aby silnik mógł przesunąć predykat poniżej agregacji.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

Jeśli w GROUP BY brakuje kolumny predykatu, nie można przenieść predykatu poniżej agregacji i źródło jest skanowane w całości.

Uwzględnij kolumnę predykatu w kluczach łączenia

Uwzględnij kolumnę predykatu w warunku łączenia, aby silnik mógł odrzucić wszystkie dołączone źródła.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Jeśli tabela sprzężona nie uwidacznia kolumny predykatu, ta tabela jest skanowana w całości podczas każdego odświeżania.

Diagnozowanie przejścia na pełne ponowne obliczanie

Gdy odświeżanie wraca do pełnego ponownego obliczenia, przyczyna jest raportowana w zdarzeniu planning_information dla przepływu. Zobacz Monitorowanie dzienników zdarzeń potoku. W poniższej tabeli wymieniono przyczyny zgłoszone w zdarzeniu:

Reason Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW Zewnętrzna operacja DML zmodyfikowała wiersze w bieżącym oknie zastępowania.
REPLACE_WHERE_NOT_DETERMINISTIC Predykat używa wyrażeń niedeterministycznych.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC Poprzednie odświeżanie używało predykatu niedeterministycznego.
UNSUPPORTED_REPLACE_WHERE_PREDICATE Predykatu nie można przesłać do żadnego źródła, bieżące okno zawiera wiersze, które nie zostały przetworzone przez poprzedni predykat, lub w ramach uruchomienia zastosowano zastąpienie predykatu.

Ograniczenia

Przepływy REPLACE WHERE mają następujące ograniczenia:

  • Tabela docelowa musi zostać utworzona w potoku.
  • Tylko jeden przepływ REPLACE WHERE jest dozwolony dla tabeli docelowej.
  • Tabela, która jest celem przepływu REPLACE WHERE, nie może być także celem innego typu przepływu, takiego jak przepływ AUTO CDC lub przepływ dopisywania.
  • Oczekiwania nie są obsługiwane w tabelach objętych przepływami REPLACE WHERE .
  • Informacje o tabelach strumieniowych utworzonych w Databricks SQL można znaleźć tutaj: REPLACE WHERE flows for standalone streaming tables; opisano tam składnię i różnice dotyczące uzupełniania danych historycznych.

Examples

W poniższych przykładach przedstawiono typowe wzorce przepływu REPLACE WHERE .

Przykład 1. Zachowywanie agregacji historycznych ze źródła ograniczonego przechowywania

W tym przykładzie agregaty dzienne są przechowywane bezterminowo, nawet po wygaśnięciu surowych danych w tabeli źródłowej (3-dniowy okres przechowywania):

SQL

CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
  return (
    spark.read.table("events_raw")
      .groupBy("date", "key")
      .agg(F.sum("val").alias("agg"))
  )

Przykład 2. Zapobieganie ponownej kompilacji w przypadku zmiany tabeli wymiarów

Ten przykład pozostawia historyczne wiersze w tabeli faktów bez zmian, gdy zmieniają się atrybuty wymiaru:

SQL

CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
  fact_table = spark.read.table("fact_table").alias("f")
  dim_users = spark.read.table("dim_users").alias("d")
  return (
    fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
      .select(
        col("f.date"),
        col("f.user_id"),
        col("d.region"),
        col("f.revenue"),
      )
  )

Jeśli region użytkownika ulegnie zmianie, tylko ostatnie wiersze zostaną ponownie skompilowane. Wiersze historyczne zachowują wartość regionu z momentu, w którym zostały zapisane. Aby skorygować historyczne wiersze, uruchom ukierunkowane uzupełnienie danych przy użyciu nadpisań predykatu.

Przykład 3. Dodawanie nowej metryki bez ponownego skompilowania pełnej historii

W tym przykładzie pokazano, jak rozwijać definicję tabeli i wypełniać tylko zakres docelowy:

  1. Zdefiniuj początkową tabelę:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    from pyspark.sql.functions import col
    
    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(F.count("*").alias("clicks"))
      )
    
  2. Zaktualizuj zapytanie, aby dodać uniq_users:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(
            F.count("*").alias("clicks"),
            F.countDistinct("user_id").alias("uniq_users"),
          )
      )
    
  3. Uzupełnij nową metrykę za ostatnie 30 dni:

    overrides = [
      {
        "flow_name": "clickstream_daily",
        "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
      }
    ]
    
    resp = start_update_with_replace_where(
      pipeline_id="<pipeline-id>",
      replace_where_overrides=overrides,
      refresh_selection=["clickstream_daily"],
    )
    

    Wiersze starsze niż zakres uzupełniony wstecz zawierają NULL dla uniq_users.

Przykład 4: Iteracja w małym oknie przed uzupełnieniem pełnej historii

W tym przykładzie pokazano, jak zweryfikować logikę zapytań w małym oknie danych przed przetworzeniem pełnego zakresu historycznego.

Rozpocznij od krótkiego okna, aby podczas modyfikowania zapytania każde odświeżenie przeliczało tylko ostatnie 7 dni:

SQL

CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
  return (
    spark.read.table("marketing_events")
      .groupBy("event_date", "campaign_id")
      .agg(F.sum("revenue").alias("total_revenue"))
  )

Po sfinalizowaniu zapytania użyj nadpisania predykatu, aby jednorazowo uzupełnić dane historyczne:

overrides = [
  {
    "flow_name": "revenue_attribution",
    "predicate_override": "event_date >= date_add(current_date(), -365)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id="<pipeline-id>",
  replace_where_overrides=overrides,
  refresh_selection=["revenue_attribution"],
)