Kötegelt feldolgozás REPLACE WHERE folyamatokkal

Important

A REPLACE WHERE folyamatok béta verzióban vannak.

Ez a lap bemutatja, hogyan használhatJA a REPLACE WHERE folyamatokat a Lakeflow Spark deklaratív folyamataiban egy tábla célzott részhalmazának újrafordítására és felülírására a teljes táblaelőzmények újrafeldolgozása nélkül. A REPLACE WHERE folyamatok kezelik a későn érkező adatokat, a felsőbb rétegbeli újrafeldolgozást, a sémafejlődést és a háttérbetöltéseket.

A REPLACE WHERE folyamattal meghatároz egy predikátumot a céltáblán. A predikátumnak megfelelő összes sort törlik, majd az ugyanarra a predikátumtartományra vonatkozó forráslekérdezés újbóli kiértékelésének eredményével helyettesítik. A predikátumnak nem megfelelő sorok érintetlenek maradnak.

Requirements

A REPLACE-folyamatokra WHERE a következő követelmények vonatkoznak:

  • A pipeline-nak a PREVIEW csatornát kell használnia.
  • A Databricks a Unity Catalog és a kiszolgáló nélküli számítás használatát javasolja. A növekményes frissítés csak kiszolgáló nélküli számítás esetén támogatott.

Mikor érdemes a REPLACE WHERE folyamatokat használni?

A REPLACE WHERE folyamatokat a következő forgatókönyvekhez használhatja:

  • Növekményes kötegelt feldolgozás streaming szemantika nélkül: Új sorok feldolgozása kötegekben, streamingfogalmak — például vízjelek — kezelése nélkül.
  • Szelektív újrafeldolgozás: Csak a predikátumnak megfelelő sorok újrafordítása, miközben az összes többi sort érintetlenül hagyja.
  • A standard materializált nézet képességein túli forgatókönyvek:
    • A forrásénál hosszabb megőrzési idejű céltáblák
    • A dimenziótáblák módosításakor történő újraszámítás megakadályozása
    • Sémafejlődés a teljes előzmények újrafordítása nélkül

REPLACE WHERE folyamat létrehozása

Definiálja a REPLACE WHERE folyamatokat SQL-ben vagy Pythonban.

SQL

Használja a(z) FLOW REPLACE WHERE záradékot egy sorban a(z) CREATE STREAMING TABLE elemmel:

CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Másik lehetőségként használja a hosszú formátumú szintaxist CREATE FLOW :

CREATE STREAMING TABLE orders_enriched;

CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
  o.order_id,
  o.date,
  o.region,
  p.product_name,
  o.qty,
  o.price
FROM orders_fct o
JOIN product_dim p
  ON o.product_id = p.product_id;

Python

A Python a tábla és a folyamat egyetlen utasításban van definiálva. A folyamat ugyanazt a nevet örökli, mint a tábla:

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
  orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
  product_dim = spark.read.table("product_dim")
  return orders_fct.join(product_dim, "product_id")

A replace_where paraméter pySpark-oszlopkifejezést vagy sztring-predikátumot fogad el.

Ezekben a példákban az elmúlt 7 nap összes sorát törlik a(z) orders_enriched-ból/-ből, majd a forráslekérdezés alapján újraszámítják őket. Nem kell hozzáadnia a predikátumot a forrás lekérdezéshez. A pipeline-motor automatikusan alkalmazza azt a forrás olvasásakor.

Note

BY NAME kötelező az SQL-ben. Az oszlopokat név alapján párosítja, nem pedig pozíció alapján.

Előzményadatok visszatöltése

Ha az ütemezett frissítéseken kívül szeretne előzménysorokat vagy javított sorokat írni a céltáblába, válasszon két mechanizmust az előzményadatok helyétől függően:

  • Predikátum felülbírálása: Futtassa újra a folyamat forráslekérdezését egy egyszeri predikátumtartománnyal. Akkor használható, ha az előzményadatok ugyanabból a forrásból származnak, mint a növekményes adatok.
  • DML-utasítások: Szúrja be közvetlenül a céltáblába, megkerülve a folyamatot. Akkor használható, ha az előzményadatok más forrásban élnek, mint a növekményes adatok.

Predikátum felülírások

Felülbírálja a REPLACE WHERE predikátumot egyetlen folyamatfrissítéshez a folyamatdefiníció módosítása nélkül. A predikátum-felülbírálások egyszeriek, csak az aktuális frissítésre vonatkoznak, és nem befolyásolják a jövőbeli futtatásokat.

Példa: Kezdeti előzménybetöltés

Az előzményadatok egyszeri visszatöltésének végrehajtása az adatfolyam első beállításakor:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
)
print(resp)

Példa: Egy oszlop helyesbítése egy adott időszakra

Az oszlopdefiníció frissítése után töltse vissza a módosítást a megadott korábbi tartományra:

pipeline_id = "<pipeline-id>"
overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id=pipeline_id,
  replace_where_overrides=overrides,
  refresh_selection=["orders_enriched"],
)
print(resp)

Több dimenzió kombinálása egyetlen predikátum-felülbírálásban:

overrides = [
  {
    "flow_name": "orders_enriched",
    "predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
  }
]
Segédfüggvény: start_update_with_replace_where

Használja a folyamatfrissítési API-t egy jegyzetfüzetből predikátum-felülbírálások küldéséhez:

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse


def start_update_with_replace_where(
  pipeline_id: str,
  replace_where_overrides: list[dict],
  refresh_selection: list[str] = None,
) -> StartUpdateResponse:
  """Start a pipeline update with REPLACE WHERE predicate overrides."""
  client = WorkspaceClient()

  body = {
    "pipeline_id": pipeline_id,
    "cause": "JOB_TASK",
    "update_cause_details": {
      "job_details": {"performance_target": "PERFORMANCE"}
    },
    "replace_where_overrides": replace_where_overrides,
  }

  if refresh_selection:
    body["refresh_selection"] = refresh_selection

  res = client.api_client.do(
    "POST",
    f"/api/2.0/pipelines/{pipeline_id}/updates",
    body=body,
    headers={"Accept": "application/json", "Content-Type": "application/json"},
  )

  return StartUpdateResponse.from_dict(res)

DML-utasítások

Futtassa a DML-utasításokat közvetlenül a céltáblán a folyamaton kívülről a kezdeti terhelések vagy javítások végrehajtásához, például egy régi táblából való betöltéshez:

INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';

A DML-n keresztül beszúrt sorokra nem vonatkozik a REPLACE WHERE predikátum, és nem maradnak meg az ütemezett frissítések között, kivéve, ha azok egy későbbi futtatás predikátumtartományához tartoznak.

Teljes frissítési viselkedés

A REPLACE WHERE folyam teljes frissítése kizárólag az aktuális predikátumot használva újra lefuttatja a forráslekérdezést. A predikátum-felülbírálások vagy az aktuális predikátumtartományon kívüli DML-utasítások által beszúrt sorok véglegesen törlődnek.

Warning

A teljes frissítés törli az összes meglévő adatot, és csak a meghatározott predikátum használatával hajtja végre újra a folyamatot. Ha egy folyamat egy éve fut egy 7 napos predikátummal, a teljes frissítés csak az utolsó 7 napot tartalmazó táblát eredményezi. Az összes régebbi sor véglegesen törlődik.

Ha meg szeretné akadályozni egy tábla teljes frissítését, állítsa a táblatulajdonságot pipelines.reset.allowed a következőre false: . Lásd : Folyamattulajdonságok referenciája.

Növekményes frissítés

A REPLACE WHERE folyamatok lehetőség szerint növekményes frissítést használnak, és a teljes csereablak újrafordítása helyett csak az utolsó frissítés óta megváltozott forrásadatokat dolgozza fel újra. A növekményes frissítés kiszolgáló nélküli számítást igényel.

Növekményes frissítés alkalmazása esetén

Az alábbiak mindegyikének igaznak kell lennie:

  • A folyamat kiszolgáló nélküli számításon fut.
  • A lekérdezésalak támogatott. Lásd a támogatott operátorkészlet növekményes frissítését .
  • A predikátum egy forrástáblából származó alaposzlopokra hivatkozik. A származtatott értékek, például az összesítő vagy az ablakfüggvény kimeneteinek predikátumai nem küldhetők le egy forrásba, amely letiltja a növekményes frissítést.
  • Az aktuális csereintervallumban egyetlen külső DML sem módosította a sorokat. Az aktuális ablakon kívüli sorokat módosító DML-t nem érinti.
  • Az aktuális csereablak nem tartalmaz olyan sorokat, amelyeket az előző predikátum kizárt. Ha kibővíti a predikátumot egy korábban nem feldolgozott tartomány lefedésére, az egy frissítés vissza fog térni a teljes újraszámításra. A további frissítések ismét jogosultak a növekményes frissítésre.
  • A predikátum determinisztikus. A nem determinisztikus függvényeket használó predikátumok, mint például a rand(), letiltják a növekményes frissítést. Az olyan időbeli függvények, mint current_date() az engedélyezettek.

Bármely folyamat első frissítése mindig teljes újraszámítás. Ha bármely feltétel nem teljesül, az adott frissítés visszatér az aktuális csereablak teljes újraszámítására.

Ajánlott eljárások a növekményes frissítéshez

Kövesse ezeket az irányelveket, így a REPLACE-folyamatok WHERE továbbra is jogosultak maradnak a növekményes frissítésre.

Mozgó alsó határ használata

A mozgó alsó határral rendelkező predikátumok korlátlan ideig jogosultak maradnak a növekményes frissítésre.

FLOW REPLACE WHERE date >= date_add(current_date(), -7)

A mozgó felső határ (például date BETWEEN date_add(current_date(), -7) AND current_date()) eltolhatja az ablakot a korábban kizárt sorok belefoglalására, és egyszeri visszalépést válthat ki teljes újraszámításra.

A predikátum oszlop belefoglalása a következőbe: GROUP BY

Összesítéskor vegye fel a predikátumoszlopot a(z) GROUP BY elembe, hogy a motor az aggregáció alá küldhesse le a predikátumot.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;

Ha a predikátumoszlop hiányzik a(z) GROUP BY-ból, a predikátum nem tolható le az aggregáció alá, és a rendszer teljes egészében beolvassa a forrást.

Vegye fel a predikátumoszlopot az összekapcsolási kulcsok közé

Adja hozzá a predikátum oszlopot az illesztési feltételhez, hogy a motor az összes csatlakoztatott forrást meg tudja metszeni.

FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;

Ha egy összekapcsolt tábla nem teszi elérhetővé a predikátum oszlopot, a program az egyes frissítéseken teljes egészében beolvasja a táblát.

A teljes újraszámításra való visszaállás diagnosztikája

Ha egy frissítés visszaesik a teljes újraszámításra, az ok a folyamat eseményében planning_information lesz jelentve. Lásd: A folyamatcsatorna eseménynaplóinak figyelése. Az alábbi táblázat az eseményben jelentett okokat sorolja fel:

Reason Meaning
EXTERNAL_CHANGE_IN_REPLACE_WINDOW Egy külső DML sorokat módosított az aktuális csereablakban.
REPLACE_WHERE_NOT_DETERMINISTIC A predikátum nem determinisztikus kifejezéseket használ.
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC Az előző frissítés nem determinisztikus predikátumot használt.
UNSUPPORTED_REPLACE_WHERE_PREDICATE A predikátum nem küldhető le egyetlen forrásba sem, az aktuális ablak olyan sorokat tartalmaz, amelyeket az előző predikátum nem dolgoz fel, vagy a futtatás predikátum-felülbírálást használ.

Limitations

A REPLACE-folyamatokra WHERE a következő korlátozások vonatkoznak:

  • A céltáblát a folyamaton belül kell létrehozni.
  • Céltáblánként csak egy REPLACE WHERE folyamat engedélyezett.
  • Egy REPLACE WHERE folyamat által célzott tábla nem lehet egy másik folyamattípus, például egy AUTO CDC folyamat vagy egy hozzáfűzési folyamat célpontja is.
  • A REPLACE WHERE folyamatok által megcélzott táblák nem támogatják az elvárásokat.
  • A Databricks SQL-ben létrehozott streaming táblák esetén a szintaxisra és a visszatöltési különbségekre vonatkozóan lásd: REPLACE WHERE folyamatok önálló streaming táblákhoz.

Examples

Az alábbi példák a REPLACE WHERE folyamatának gyakori mintáit mutatják be.

1. példa: A korábbi összesítések megőrzése korlátozott megőrzési forrásból

Ez a példa határozatlan ideig tartja fenn a napi összesítéseket, még akkor is, ha a nyers adatok a forrástáblán kívülre öregednek (3 napos megőrzés):

SQL

CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
  date,
  key,
  SUM(val) AS agg
FROM events_raw
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
  return (
    spark.read.table("events_raw")
      .groupBy("date", "key")
      .agg(F.sum("val").alias("agg"))
  )

2. példa: A dimenziótáblák módosításakor történő újraszámítás megakadályozása

Ez a példa a dimenzióattribútumok változásakor változatlanul hagyja a korábbi ténysorokat:

SQL

CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
  f.date,
  f.user_id,
  d.region,
  f.revenue
FROM fact_table f
JOIN dim_users d
  ON f.user_id = d.user_id;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
  fact_table = spark.read.table("fact_table").alias("f")
  dim_users = spark.read.table("dim_users").alias("d")
  return (
    fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
      .select(
        col("f.date"),
        col("f.user_id"),
        col("d.region"),
        col("f.revenue"),
      )
  )

Ha egy felhasználó régiója megváltozik, csak a legutóbbi sorok lesznek újrafordítve. A historikus sorok megtartják a régió íráskori értékét. A korábbi sorok javításához futtasson egy célzott utólagos feltöltést predikátumfelülírások használatával.

3. példa: Új metrika hozzáadása a teljes előzmények újrafordítása nélkül

Ez a példa bemutatja, hogyan fejleszthet táblázatdefiníciót, és hogyan tölthet vissza csak egy célzott tartományt:

  1. Adja meg a kezdeti táblát:

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql import functions as F
    from pyspark.sql.functions import col
    
    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(F.count("*").alias("clicks"))
      )
    
  2. Frissítse a lekérdezést, és adja hozzá ezt: uniq_users

    SQL

    CREATE STREAMING TABLE clickstream_daily
    FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
    SELECT
      event_date,
      page_id,
      COUNT(*) AS clicks,
      COUNT(DISTINCT user_id) AS uniq_users
    FROM clickstream_raw
    GROUP BY ALL;
    

    Python

    @dp.table(
      replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
    )
    def clickstream_daily():
      return (
        spark.read.table("clickstream_raw")
          .groupBy("event_date", "page_id")
          .agg(
            F.count("*").alias("clicks"),
            F.countDistinct("user_id").alias("uniq_users"),
          )
      )
    
  3. Töltse ki az új metrikát az elmúlt 30 napban:

    overrides = [
      {
        "flow_name": "clickstream_daily",
        "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'",
      }
    ]
    
    resp = start_update_with_replace_where(
      pipeline_id="<pipeline-id>",
      replace_where_overrides=overrides,
      refresh_selection=["clickstream_daily"],
    )
    

    A visszatöltött tartománynál régebbi sorok a következőt NULLtartalmazzákuniq_users: .

4. példa: Iterálás egy kis ablakban a teljes előzmények kitöltése előtt

Ez a példa bemutatja, hogyan érvényesítheti a lekérdezési logikát egy kis adatablakban a teljes előzménytartomány feldolgozása előtt.

Kezdje egy rövid ablakkal, hogy az egyes frissítések csak az utolsó 7 napot újrafordíthassák a lekérdezés módosításakor:

SQL

CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
  event_date,
  campaign_id,
  SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;

Python

from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col

@dp.table(
  replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
  return (
    spark.read.table("marketing_events")
      .groupBy("event_date", "campaign_id")
      .agg(F.sum("revenue").alias("total_revenue"))
  )

A lekérdezés véglegesítése után használjon predikátum-felülbírálást egy egyszeri előzmény-visszatöltés végrehajtásához:

overrides = [
  {
    "flow_name": "revenue_attribution",
    "predicate_override": "event_date >= date_add(current_date(), -365)",
  }
]

resp = start_update_with_replace_where(
  pipeline_id="<pipeline-id>",
  replace_where_overrides=overrides,
  refresh_selection=["revenue_attribution"],
)