Megjegyzés
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhat bejelentkezni vagy módosítani a címtárat.
Az oldalhoz való hozzáféréshez engedély szükséges. Megpróbálhatja módosítani a címtárat.
Important
A REPLACE WHERE folyamatok béta verzióban vannak.
Ez a lap bemutatja, hogyan használhatJA a REPLACE WHERE folyamatokat a Lakeflow Spark deklaratív folyamataiban egy tábla célzott részhalmazának újrafordítására és felülírására a teljes táblaelőzmények újrafeldolgozása nélkül. A REPLACE WHERE folyamatok kezelik a későn érkező adatokat, a felsőbb rétegbeli újrafeldolgozást, a sémafejlődést és a háttérbetöltéseket.
A REPLACE WHERE folyamattal meghatároz egy predikátumot a céltáblán. A predikátumnak megfelelő összes sort törlik, majd az ugyanarra a predikátumtartományra vonatkozó forráslekérdezés újbóli kiértékelésének eredményével helyettesítik. A predikátumnak nem megfelelő sorok érintetlenek maradnak.
Requirements
A REPLACE-folyamatokra WHERE a következő követelmények vonatkoznak:
- A pipeline-nak a
PREVIEWcsatornát kell használnia. - A Databricks a Unity Catalog és a kiszolgáló nélküli számítás használatát javasolja. A növekményes frissítés csak kiszolgáló nélküli számítás esetén támogatott.
Mikor érdemes a REPLACE WHERE folyamatokat használni?
A REPLACE WHERE folyamatokat a következő forgatókönyvekhez használhatja:
- Növekményes kötegelt feldolgozás streaming szemantika nélkül: Új sorok feldolgozása kötegekben, streamingfogalmak — például vízjelek — kezelése nélkül.
- Szelektív újrafeldolgozás: Csak a predikátumnak megfelelő sorok újrafordítása, miközben az összes többi sort érintetlenül hagyja.
-
A standard materializált nézet képességein túli forgatókönyvek:
- A forrásénál hosszabb megőrzési idejű céltáblák
- A dimenziótáblák módosításakor történő újraszámítás megakadályozása
- Sémafejlődés a teljes előzmények újrafordítása nélkül
REPLACE WHERE folyamat létrehozása
Definiálja a REPLACE WHERE folyamatokat SQL-ben vagy Pythonban.
SQL
Használja a(z) FLOW REPLACE WHERE záradékot egy sorban a(z) CREATE STREAMING TABLE elemmel:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Másik lehetőségként használja a hosszú formátumú szintaxist CREATE FLOW :
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Python
A Python a tábla és a folyamat egyetlen utasításban van definiálva. A folyamat ugyanazt a nevet örökli, mint a tábla:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
A replace_where paraméter pySpark-oszlopkifejezést vagy sztring-predikátumot fogad el.
Ezekben a példákban az elmúlt 7 nap összes sorát törlik a(z) orders_enriched-ból/-ből, majd a forráslekérdezés alapján újraszámítják őket. Nem kell hozzáadnia a predikátumot a forrás lekérdezéshez. A pipeline-motor automatikusan alkalmazza azt a forrás olvasásakor.
Note
BY NAME kötelező az SQL-ben. Az oszlopokat név alapján párosítja, nem pedig pozíció alapján.
Előzményadatok visszatöltése
Ha az ütemezett frissítéseken kívül szeretne előzménysorokat vagy javított sorokat írni a céltáblába, válasszon két mechanizmust az előzményadatok helyétől függően:
- Predikátum felülbírálása: Futtassa újra a folyamat forráslekérdezését egy egyszeri predikátumtartománnyal. Akkor használható, ha az előzményadatok ugyanabból a forrásból származnak, mint a növekményes adatok.
- DML-utasítások: Szúrja be közvetlenül a céltáblába, megkerülve a folyamatot. Akkor használható, ha az előzményadatok más forrásban élnek, mint a növekményes adatok.
Predikátum felülírások
Felülbírálja a REPLACE WHERE predikátumot egyetlen folyamatfrissítéshez a folyamatdefiníció módosítása nélkül. A predikátum-felülbírálások egyszeriek, csak az aktuális frissítésre vonatkoznak, és nem befolyásolják a jövőbeli futtatásokat.
Példa: Kezdeti előzménybetöltés
Az előzményadatok egyszeri visszatöltésének végrehajtása az adatfolyam első beállításakor:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
Példa: Egy oszlop helyesbítése egy adott időszakra
Az oszlopdefiníció frissítése után töltse vissza a módosítást a megadott korábbi tartományra:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
Több dimenzió kombinálása egyetlen predikátum-felülbírálásban:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
Segédfüggvény: start_update_with_replace_where
Használja a folyamatfrissítési API-t egy jegyzetfüzetből predikátum-felülbírálások küldéséhez:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
DML-utasítások
Futtassa a DML-utasításokat közvetlenül a céltáblán a folyamaton kívülről a kezdeti terhelések vagy javítások végrehajtásához, például egy régi táblából való betöltéshez:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
A DML-n keresztül beszúrt sorokra nem vonatkozik a REPLACE WHERE predikátum, és nem maradnak meg az ütemezett frissítések között, kivéve, ha azok egy későbbi futtatás predikátumtartományához tartoznak.
Teljes frissítési viselkedés
A REPLACE WHERE folyam teljes frissítése kizárólag az aktuális predikátumot használva újra lefuttatja a forráslekérdezést. A predikátum-felülbírálások vagy az aktuális predikátumtartományon kívüli DML-utasítások által beszúrt sorok véglegesen törlődnek.
Warning
A teljes frissítés törli az összes meglévő adatot, és csak a meghatározott predikátum használatával hajtja végre újra a folyamatot. Ha egy folyamat egy éve fut egy 7 napos predikátummal, a teljes frissítés csak az utolsó 7 napot tartalmazó táblát eredményezi. Az összes régebbi sor véglegesen törlődik.
Ha meg szeretné akadályozni egy tábla teljes frissítését, állítsa a táblatulajdonságot pipelines.reset.allowed a következőre false: . Lásd : Folyamattulajdonságok referenciája.
Növekményes frissítés
A REPLACE WHERE folyamatok lehetőség szerint növekményes frissítést használnak, és a teljes csereablak újrafordítása helyett csak az utolsó frissítés óta megváltozott forrásadatokat dolgozza fel újra. A növekményes frissítés kiszolgáló nélküli számítást igényel.
Növekményes frissítés alkalmazása esetén
Az alábbiak mindegyikének igaznak kell lennie:
- A folyamat kiszolgáló nélküli számításon fut.
- A lekérdezésalak támogatott. Lásd a támogatott operátorkészlet növekményes frissítését .
- A predikátum egy forrástáblából származó alaposzlopokra hivatkozik. A származtatott értékek, például az összesítő vagy az ablakfüggvény kimeneteinek predikátumai nem küldhetők le egy forrásba, amely letiltja a növekményes frissítést.
- Az aktuális csereintervallumban egyetlen külső DML sem módosította a sorokat. Az aktuális ablakon kívüli sorokat módosító DML-t nem érinti.
- Az aktuális csereablak nem tartalmaz olyan sorokat, amelyeket az előző predikátum kizárt. Ha kibővíti a predikátumot egy korábban nem feldolgozott tartomány lefedésére, az egy frissítés vissza fog térni a teljes újraszámításra. A további frissítések ismét jogosultak a növekményes frissítésre.
- A predikátum determinisztikus. A nem determinisztikus függvényeket használó predikátumok, mint például a
rand(), letiltják a növekményes frissítést. Az olyan időbeli függvények, mintcurrent_date()az engedélyezettek.
Bármely folyamat első frissítése mindig teljes újraszámítás. Ha bármely feltétel nem teljesül, az adott frissítés visszatér az aktuális csereablak teljes újraszámítására.
Ajánlott eljárások a növekményes frissítéshez
Kövesse ezeket az irányelveket, így a REPLACE-folyamatok WHERE továbbra is jogosultak maradnak a növekményes frissítésre.
Mozgó alsó határ használata
A mozgó alsó határral rendelkező predikátumok korlátlan ideig jogosultak maradnak a növekményes frissítésre.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
A mozgó felső határ (például date BETWEEN date_add(current_date(), -7) AND current_date()) eltolhatja az ablakot a korábban kizárt sorok belefoglalására, és egyszeri visszalépést válthat ki teljes újraszámításra.
A predikátum oszlop belefoglalása a következőbe: GROUP BY
Összesítéskor vegye fel a predikátumoszlopot a(z) GROUP BY elembe, hogy a motor az aggregáció alá küldhesse le a predikátumot.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Ha a predikátumoszlop hiányzik a(z) GROUP BY-ból, a predikátum nem tolható le az aggregáció alá, és a rendszer teljes egészében beolvassa a forrást.
Vegye fel a predikátumoszlopot az összekapcsolási kulcsok közé
Adja hozzá a predikátum oszlopot az illesztési feltételhez, hogy a motor az összes csatlakoztatott forrást meg tudja metszeni.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Ha egy összekapcsolt tábla nem teszi elérhetővé a predikátum oszlopot, a program az egyes frissítéseken teljes egészében beolvasja a táblát.
A teljes újraszámításra való visszaállás diagnosztikája
Ha egy frissítés visszaesik a teljes újraszámításra, az ok a folyamat eseményében planning_information lesz jelentve. Lásd: A folyamatcsatorna eseménynaplóinak figyelése. Az alábbi táblázat az eseményben jelentett okokat sorolja fel:
| Reason | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Egy külső DML sorokat módosított az aktuális csereablakban. |
REPLACE_WHERE_NOT_DETERMINISTIC |
A predikátum nem determinisztikus kifejezéseket használ. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
Az előző frissítés nem determinisztikus predikátumot használt. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
A predikátum nem küldhető le egyetlen forrásba sem, az aktuális ablak olyan sorokat tartalmaz, amelyeket az előző predikátum nem dolgoz fel, vagy a futtatás predikátum-felülbírálást használ. |
Limitations
A REPLACE-folyamatokra WHERE a következő korlátozások vonatkoznak:
- A céltáblát a folyamaton belül kell létrehozni.
- Céltáblánként csak egy REPLACE WHERE folyamat engedélyezett.
- Egy REPLACE WHERE folyamat által célzott tábla nem lehet egy másik folyamattípus, például egy AUTO CDC folyamat vagy egy hozzáfűzési folyamat célpontja is.
- A REPLACE WHERE folyamatok által megcélzott táblák nem támogatják az elvárásokat.
- A Databricks SQL-ben létrehozott streaming táblák esetén a szintaxisra és a visszatöltési különbségekre vonatkozóan lásd: REPLACE WHERE folyamatok önálló streaming táblákhoz.
Examples
Az alábbi példák a REPLACE WHERE folyamatának gyakori mintáit mutatják be.
1. példa: A korábbi összesítések megőrzése korlátozott megőrzési forrásból
Ez a példa határozatlan ideig tartja fenn a napi összesítéseket, még akkor is, ha a nyers adatok a forrástáblán kívülre öregednek (3 napos megőrzés):
SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
2. példa: A dimenziótáblák módosításakor történő újraszámítás megakadályozása
Ez a példa a dimenzióattribútumok változásakor változatlanul hagyja a korábbi ténysorokat:
SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
Ha egy felhasználó régiója megváltozik, csak a legutóbbi sorok lesznek újrafordítve. A historikus sorok megtartják a régió íráskori értékét. A korábbi sorok javításához futtasson egy célzott utólagos feltöltést predikátumfelülírások használatával.
3. példa: Új metrika hozzáadása a teljes előzmények újrafordítása nélkül
Ez a példa bemutatja, hogyan fejleszthet táblázatdefiníciót, és hogyan tölthet vissza csak egy célzott tartományt:
Adja meg a kezdeti táblát:
SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Python
from pyspark import pipelines as dp from pyspark.sql import functions as F from pyspark.sql.functions import col @dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg(F.count("*").alias("clicks")) )Frissítse a lekérdezést, és adja hozzá ezt:
uniq_usersSQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Python
@dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg( F.count("*").alias("clicks"), F.countDistinct("user_id").alias("uniq_users"), ) )Töltse ki az új metrikát az elmúlt 30 napban:
overrides = [ { "flow_name": "clickstream_daily", "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'", } ] resp = start_update_with_replace_where( pipeline_id="<pipeline-id>", replace_where_overrides=overrides, refresh_selection=["clickstream_daily"], )A visszatöltött tartománynál régebbi sorok a következőt
NULLtartalmazzákuniq_users: .
4. példa: Iterálás egy kis ablakban a teljes előzmények kitöltése előtt
Ez a példa bemutatja, hogyan érvényesítheti a lekérdezési logikát egy kis adatablakban a teljes előzménytartomány feldolgozása előtt.
Kezdje egy rövid ablakkal, hogy az egyes frissítések csak az utolsó 7 napot újrafordíthassák a lekérdezés módosításakor:
SQL
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
A lekérdezés véglegesítése után használjon predikátum-felülbírálást egy egyszeri előzmény-visszatöltés végrehajtásához:
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)