Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Important
REPLACE-stromen WHERE bevinden zich in de bètaversie.
Op deze pagina wordt beschreven hoe u REPLACE-stromen WHERE in Lakeflow Spark-declaratieve pijplijnen gebruikt om een doelsubset van een tabel opnieuw te compileren en te overschrijven zonder de volledige tabelgeschiedenis opnieuw te verwerken. REPLACE-stromen WHERE verwerken late binnenkomende gegevens, upstream-herverwerking, schemaontwikkeling en backfills.
Met een REPLACE WHERE flow definieert u een predicaat op de doeltabel. Alle rijen die overeenkomen met het predicaat worden verwijderd en vervangen door de bronquery voor hetzelfde predicaatbereik opnieuw te evalueren. Rijen die niet overeenkomen met het predicaat, blijven ongewijzigd.
Requirements
REPLACE-stromen WHERE hebben de volgende vereisten:
- Uw pijplijn moet het
PREVIEWkanaal gebruiken. - Databricks raadt Unity Catalog en serverloze berekeningen aan. Incrementeel vernieuwen wordt alleen ondersteund op serverloze berekeningen.
Wanneer moet u REPLACE-stromen WHERE gebruiken
Gebruik REPLACE-stromen WHERE voor de volgende scenario's:
- Incrementele batchverwerking zonder streaming-semantiek: nieuwe rijen in batches verwerken zonder streamingconcepten zoals watermerken te beheren.
- Selectief opnieuw verwerken: alleen rijen die overeenkomen met een predicaat opnieuw compileren terwijl alle andere rijen ongewijzigd blijven.
-
Scenario's die verder gaan dan de standaard gerealiseerde weergavemogelijkheden:
- Doeltabellen met een langere retentie dan de brontabellen
- Hercomputatie voorkomen wanneer een dimensietabel wordt gewijzigd
- Schemaontwikkeling zonder volledige geschiedenis opnieuw te compileren
Een REPLACE-werkstroom maken WHERE
Definieer REPLACE WHERE stromen in SQL of Python.
SQL
Gebruik de FLOW REPLACE WHERE clausule inline samen met CREATE STREAMING TABLE:
CREATE STREAMING TABLE orders_enriched
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
U kunt ook de syntaxis van de lange vorm CREATE FLOW gebruiken:
CREATE STREAMING TABLE orders_enriched;
CREATE FLOW orders_enriched AS
INSERT INTO orders_enriched BY NAME
REPLACE WHERE date >= date_add(current_date(), -7)
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Python
In Python worden de tabel en de stroom in één instructie gedefinieerd. De stroom neemt dezelfde naam over als de tabel:
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 7)
)
def orders_enriched():
orders_fct = spark.read.table("orders_fct").select("date", "order_id", "region", "qty", "price")
product_dim = spark.read.table("product_dim")
return orders_fct.join(product_dim, "product_id")
De replace_where parameter accepteert een PySpark-kolomexpressie of een tekenreekspredicaat.
In deze voorbeelden worden alle rijen uit de afgelopen 7 dagen verwijderd uit orders_enriched en opnieuw berekend met behulp van de bronquery. U hoeft het predicaat niet toe te voegen aan de bronquery. De pijplijnengine past deze automatisch toe bij het lezen van de bron.
Note
BY NAME is vereist in SQL. Het koppelt kolommen op naam in plaats van op positie.
Historische gegevens invullen
Als u historische of gecorrigeerde rijen naar de doeltabel wilt schrijven buiten geplande vernieuwingen, kiest u tussen twee mechanismen op basis van waar de historische gegevens zich bevinden:
- Predicaatoverschrijvingen: voer de bronquery van de stroom opnieuw uit voor een eenmalig predicaatbereik. Gebruik deze functie wanneer de historische gegevens afkomstig zijn van dezelfde bron als de incrementele gegevens.
- DML-instructies: Voeg rechtstreeks in de doeltabel in, waarbij de stroom wordt overgeslagen. Gebruik deze functie wanneer de historische gegevens zich in een andere bron bevinden dan de incrementele gegevens.
Overschrijvingen van predicaten
Overschrijf het predicaat REPLACE WHERE voor één pijplijnupdate zonder de pijplijndefinitie te wijzigen. Predicaatoverschrijvingen zijn eenmalig, zijn alleen van toepassing op de huidige update en hebben geen invloed op toekomstige uitvoeringen.
Voorbeeld: Eerste historische belasting
Ga als volgt te werk om eenmalig historische gegevens aan te vullen wanneer u voor het eerst een pijplijn instelt:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date BETWEEN '2020-01-01' AND '2024-12-31'",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
)
print(resp)
Voorbeeld: Een kolom corrigeren voor een specifieke periode
Na het bijwerken van een kolomdefinitie past u de wijziging toe op een specifiek historisch gegevensbereik:
pipeline_id = "<pipeline-id>"
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30)",
}
]
resp = start_update_with_replace_where(
pipeline_id=pipeline_id,
replace_where_overrides=overrides,
refresh_selection=["orders_enriched"],
)
print(resp)
Combineer meerdere dimensies in één predicaatoverschrijving:
overrides = [
{
"flow_name": "orders_enriched",
"predicate_override": "date >= date_add(current_date(), -30) AND region = 'asia'",
}
]
Hulpfunctie: start_update_with_replace_where
Gebruik de API voor pijplijnupdates vanuit een notebook om predicaatoverschrijvingen te verzenden:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.pipelines import StartUpdateResponse
def start_update_with_replace_where(
pipeline_id: str,
replace_where_overrides: list[dict],
refresh_selection: list[str] = None,
) -> StartUpdateResponse:
"""Start a pipeline update with REPLACE WHERE predicate overrides."""
client = WorkspaceClient()
body = {
"pipeline_id": pipeline_id,
"cause": "JOB_TASK",
"update_cause_details": {
"job_details": {"performance_target": "PERFORMANCE"}
},
"replace_where_overrides": replace_where_overrides,
}
if refresh_selection:
body["refresh_selection"] = refresh_selection
res = client.api_client.do(
"POST",
f"/api/2.0/pipelines/{pipeline_id}/updates",
body=body,
headers={"Accept": "application/json", "Content-Type": "application/json"},
)
return StartUpdateResponse.from_dict(res)
DML-instructies
Voer DML-instructies rechtstreeks uit op de doeltabel van buiten de pijplijn om eerste gegevensladingen of correcties uit te voeren, zoals het laden van gegevens uit een verouderde tabel:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Rijen die via DML worden ingevoegd, zijn niet onderworpen aan het predicaat REPLACE WHERE en blijven behouden voor geplande vernieuwingen, tenzij ze binnen het predicaatbereik van een toekomstige uitvoering vallen.
Gedrag bij volledige vernieuwing
Een volledige verversing van een REPLACE WHERE flow voert de bronquery opnieuw uit, waarbij alleen het huidige predicaat wordt gebruikt. Rijen die zijn ingevoegd door predicaatoverschrijvingen of DML-instructies buiten het huidige predicaatbereik, worden definitief verwijderd.
Warning
Met een volledige vernieuwing worden alle bestaande gegevens gewist en wordt de stroom opnieuw uitgevoerd met alleen het gedefinieerde predicaat. Als een pijplijn een jaar heeft gedraaid met een predicaat van 7 dagen, leidt een volledige vernieuwing ertoe dat de tabel alleen gegevens van de laatste 7 dagen bevat. Alle oudere rijen worden definitief verwijderd.
Als u volledige vernieuwingen in een tabel wilt voorkomen, stelt u de tabeleigenschap pipelines.reset.allowed in op false. Zie naslaginformatie over pijplijneigenschappen.
Stapsgewijs vernieuwen
REPLACE-stromen WHERE maken gebruik van incrementeel vernieuwen indien mogelijk, waarbij alleen de brongegevens die zijn gewijzigd sinds de laatste vernieuwing opnieuw worden verwerkt in plaats van het volledige vervangingsvenster opnieuw te compileren. Incrementeel vernieuwen vereist serverloze berekeningen.
Wanneer incrementeel vernieuwen van toepassing is
Alle volgende waarden moeten waar zijn:
- De pijplijn draait op serverless computing.
- De queryvorm wordt ondersteund. Zie Incrementeel vernieuwen voor de ondersteunde operatorset.
- Het predicaat verwijst naar basiskolommen uit een brontabel. Predicaten voor afgeleide waarden, zoals uitvoer van statistische functies of vensterfuncties, kunnen niet worden gepusht naar een bron, waardoor incrementeel vernieuwen wordt uitgeschakeld.
- Er zijn geen externe DML-rijen gewijzigd in het huidige vervangingsvenster. DML waarmee rijen buiten het huidige venster worden gewijzigd, wordt niet beïnvloed.
- Het huidige vervangingsvenster bevat geen rijen die door het vorige predicaat zijn uitgesloten. Als u het predicaat breder maakt om een bereik te dekken dat niet eerder is verwerkt, valt die vernieuwing terug op volledige hercomputatie. Volgende vernieuwingen komen weer in aanmerking voor incrementele vernieuwing.
- Het predicaat is deterministisch. Predicaten die niet-deterministische functies gebruiken, zoals
rand(), schakelen het incrementeel vernieuwen uit. Tijdelijke functies zoalscurrent_date()zijn toegestaan.
De eerste verversing van een flow is altijd een volledige berekening. Als niet aan een voorwaarde is voldaan, valt die verversing terug op een volledige herberekening van het huidige vervangingsvenster.
Aanbevolen procedures voor incrementeel vernieuwen
Volg deze richtlijnen zodat REPLACE-WHEREwerkstromen in aanmerking blijven komen voor stapsgewijs vernieuwen.
Een bewegende ondergrens gebruiken
Predicaten met een bewegende ondergrens blijven voor onbepaalde tijd in aanmerking komen voor incrementeel vernieuwen.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Een bewegende bovengrens, zoals date BETWEEN date_add(current_date(), -7) AND current_date(), kan het venster verschuiven om eerder uitgesloten rijen op te nemen, waardoor een eenmalige terugval naar volledige hercomputatie wordt geactiveerd.
Neem de predicaatkolom op in GROUP BY
Neem bij het aggregeren de predicaatkolom op in GROUP BY, zodat de engine het predicaat onder de aggregatie kan verplaatsen.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Als de predicaatkolom ontbreekt in GROUP BY, kan het predicaat niet onder de aggregatie worden doorgeschoven en wordt de bron volledig gescand.
De predicaatkolom opnemen in sleutels voor joins
Neem de predicaatkolom op in de joinvoorwaarde, zodat de engine alle gekoppelde bronnen kan verwijderen.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Als een gekoppelde tabel de predicaatkolom niet beschikbaar maakt, wordt die tabel bij elke vernieuwing volledig gescand.
Diagnosticeer terugval naar volledige herberekening
Wanneer een verversing terugvalt op een volledige herberekening, wordt de reden vermeld in de gebeurtenis planning_information van de flow. Zie Gebeurtenislogboeken van de pijplijn controleren. De volgende tabel bevat de redenen die in de gebeurtenis zijn gerapporteerd:
| Reason | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Een externe DML heeft rijen gewijzigd in het huidige vervangingsvenster. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Het predicaat maakt gebruik van niet-deterministische expressies. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
De vorige vernieuwing heeft een niet-deterministisch predicaat gebruikt. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Het predicaat kan niet naar een bron worden gepusht, het huidige venster bevat rijen die niet worden verwerkt door het vorige predicaat of de uitvoering maakt gebruik van een predicaatoverschrijving. |
Limitations
REPLACE-stromen WHERE hebben de volgende beperkingen:
- De doeltabel moet binnen de pipeline worden gemaakt.
- Er is slechts één REPLACE-stroom WHERE toegestaan per doeltabel.
- Een tabel waarop een REPLACE-flow WHERE is gericht, kan niet ook het doel zijn van een ander flowtype, zoals een AUTO CDC-flow of een append-flow.
- Expectaties worden niet ondersteund bij tabellen die het doel zijn van REPLACE WHERE-flows.
- Zie REPLACE-flows WHERE voor zelfstandige streamingtabellen voor syntaxis- en backfillverschillen voor streamingtabellen die zijn gemaakt in Databricks SQL.
Examples
In de volgende voorbeelden ziet u veelvoorkomende REPLACE-stroompatronen WHERE .
Voorbeeld 1: Historische aggregaties van een bron met beperkte retentie behouden
In dit voorbeeld worden dagelijkse aggregaties voor onbepaalde tijd bijgehouden, zelfs nadat onbewerkte gegevens uit de brontabel zijn verouderd (retentie van drie dagen):
SQL
CREATE STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 3)
)
def events_agg():
return (
spark.read.table("events_raw")
.groupBy("date", "key")
.agg(F.sum("val").alias("agg"))
)
Voorbeeld 2: Hercomputatie voorkomen wanneer een dimensietabel wordt gewijzigd
In dit voorbeeld blijven historische feitenrijen ongewijzigd wanneer dimensiekenmerken veranderen:
SQL
CREATE STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("date") >= F.date_sub(F.current_date(), 1)
)
def fact_dim_join():
fact_table = spark.read.table("fact_table").alias("f")
dim_users = spark.read.table("dim_users").alias("d")
return (
fact_table.join(dim_users, col("f.user_id") == col("d.user_id"))
.select(
col("f.date"),
col("f.user_id"),
col("d.region"),
col("f.revenue"),
)
)
Als de regio van een gebruiker verandert, worden alleen recente rijen opnieuw gecomputeerd. Historische rijen behouden de regiowaarde op het moment dat ze zijn geschreven. Als u historische rijen wilt corrigeren, voert u een gerichte aanvulling uit met predicaatoverschrijvingen.
Voorbeeld 3: Een nieuwe metriek toevoegen zonder de volledige geschiedenis opnieuw te compileren
In dit voorbeeld ziet u hoe u een tabeldefinitie ontwikkelt en alleen een doelbereik bijvult:
Definieer de eerste tabel:
SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Python
from pyspark import pipelines as dp from pyspark.sql import functions as F from pyspark.sql.functions import col @dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg(F.count("*").alias("clicks")) )Werk de query bij om toe te voegen
uniq_users:SQL
CREATE STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Python
@dp.table( replace_where=col("event_date") >= F.date_sub(F.current_date(), 7) ) def clickstream_daily(): return ( spark.read.table("clickstream_raw") .groupBy("event_date", "page_id") .agg( F.count("*").alias("clicks"), F.countDistinct("user_id").alias("uniq_users"), ) )Vul de nieuwe metrische waarde voor de afgelopen 30 dagen opnieuw in:
overrides = [ { "flow_name": "clickstream_daily", "predicate_override": "event_date BETWEEN '2026-01-01' AND '2026-01-30'", } ] resp = start_update_with_replace_where( pipeline_id="<pipeline-id>", replace_where_overrides=overrides, refresh_selection=["clickstream_daily"], )Rijen die ouder zijn dan het teruggevulde bereik bevatten
NULLvooruniq_users.
Voorbeeld 4: Itereren op een klein venster voordat de volledige historische gegevens worden aangevuld
In dit voorbeeld ziet u hoe u querylogica in een klein gegevensvenster valideert voordat u het volledige historische bereik verwerkt.
Begin met een kort tijdvenster, zodat bij elke verversing alleen de afgelopen zeven dagen opnieuw worden berekend terwijl u de query aanpast:
SQL
CREATE STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Python
from pyspark import pipelines as dp
from pyspark.sql import functions as F
from pyspark.sql.functions import col
@dp.table(
replace_where=col("event_date") >= F.date_sub(F.current_date(), 7)
)
def revenue_attribution():
return (
spark.read.table("marketing_events")
.groupBy("event_date", "campaign_id")
.agg(F.sum("revenue").alias("total_revenue"))
)
Nadat de query is voltooid, gebruikt u een predicaatoverschrijving om een eenmalige historische backfill uit te voeren:
overrides = [
{
"flow_name": "revenue_attribution",
"predicate_override": "event_date >= date_add(current_date(), -365)",
}
]
resp = start_update_with_replace_where(
pipeline_id="<pipeline-id>",
replace_where_overrides=overrides,
refresh_selection=["revenue_attribution"],
)