Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Important
REPLACE-stromen WHERE voor zelfstandige streamingtabellen bevinden zich in de bètaversie.
Op deze pagina wordt beschreven hoe u REPLACE-stromen WHERE gebruikt om een doelsubset van een zelfstandige streamingtabel opnieuw te compileren en te overschrijven zonder de volledige tabelgeschiedenis opnieuw te verwerken. REPLACE-stromen WHERE verwerken late binnenkomende gegevens, upstream-herverwerking, schemaontwikkeling en backfills.
Met een REPLACE WHERE flow definieert u een predicaat op de doeltabel. Alle rijen die overeenkomen met het predicaat worden verwijderd en vervangen door de bronquery voor hetzelfde predicaatbereik opnieuw te evalueren. Rijen die niet overeenkomen met het predicaat blijven ongewijzigd.
Requirements
REPLACE-stromen WHERE hebben de volgende vereisten:
- De streamingtabel moet het
PREVIEWkanaal gebruiken. Ziechannelin Pijplijnconfiguraties. - Databricks raadt Unity Catalog en serverloze berekeningen aan. Incrementeel vernieuwen wordt alleen ondersteund op serverloze berekeningen.
Wanneer moet u REPLACE-stromen WHERE gebruiken
Gebruik REPLACE-stromen WHERE voor de volgende scenario's:
- Incrementele batchverwerking zonder streaming-semantiek: nieuwe rijen in batches verwerken zonder streamingconcepten zoals watermerken te beheren.
- Selectief opnieuw verwerken: alleen rijen die overeenkomen met een predicaat opnieuw compileren terwijl alle andere rijen ongewijzigd blijven.
-
Scenario's die verder gaan dan de standaard gerealiseerde weergavemogelijkheden:
- Doeltabellen met een langere retentie dan de brontabellen
- Hercomputatie voorkomen wanneer een dimensietabel wordt gewijzigd
- Schemaontwikkeling zonder volledige geschiedenis opnieuw te compileren
Een REPLACE-werkstroom maken WHERE
Gebruik de FLOW REPLACE WHERE clausule inline samen met CREATE OR REFRESH STREAMING TABLE:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Tijdens het vernieuwen worden alle rijen in de doeltabel die overeenkomen met het predicaat verwijderd, wordt de bronquery opnieuw gecomputeerd voor hetzelfde predicaatbereik en worden de nieuwe resultaten ingevoegd. In dit voorbeeld worden alle rijen uit de afgelopen 7 dagen verwijderd uit orders_enriched en opnieuw berekend met behulp van de bronquery.
U hoeft het predicaat niet toe te voegen aan de bronquery. De pijplijnengine past deze automatisch toe bij het lezen van de bron.
Note
BY NAME is vereist. Het zorgt ervoor dat kolommen op naam worden vergeleken in plaats van op positie.
Historische gegevens invullen
Als u backfills wilt uitvoeren, voert u DML-instructies rechtstreeks uit in de doeltabel:
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Gedrag bij volledige vernieuwing
Een volledige verversing van een REPLACE WHERE flow voert de bronquery opnieuw uit, waarbij alleen het huidige predicaat wordt gebruikt. Rijen die door DML-instructies buiten het huidige predicaatbereik zijn ingevoegd, worden definitief verwijderd.
Warning
Met een volledige vernieuwing worden alle bestaande gegevens gewist en wordt de stroom opnieuw uitgevoerd met alleen het gedefinieerde predicaat. Als een pijplijn een jaar heeft gedraaid met een predicaat van 7 dagen, leidt een volledige vernieuwing ertoe dat de tabel alleen gegevens van de laatste 7 dagen bevat. Alle oudere rijen worden definitief verwijderd.
REFRESH STREAMING TABLE orders_enriched FULL;
Om volledige vernieuwingen van een tabel te voorkomen, stelt u de tabeleigenschap pipelines.reset.allowed in op false:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.reset.allowed = 'false')
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
...
Stapsgewijs vernieuwen
REPLACE-stromen WHERE maken gebruik van incrementeel vernieuwen indien mogelijk, waarbij alleen de brongegevens die zijn gewijzigd sinds de laatste vernieuwing opnieuw worden verwerkt in plaats van het volledige vervangingsvenster opnieuw te compileren. Incrementeel vernieuwen vereist serverloze berekeningen.
Wanneer incrementeel vernieuwen van toepassing is
Alle volgende waarden moeten waar zijn:
- De pijplijn draait op serverless computing.
- De queryvorm wordt ondersteund. Zie Incrementeel vernieuwen voor de ondersteunde operatorset.
- Het predicaat verwijst naar basiskolommen uit een brontabel. Predicaten voor afgeleide waarden, zoals uitvoer van statistische functies of vensterfuncties, kunnen niet worden gepusht naar een bron, waardoor incrementeel vernieuwen wordt uitgeschakeld.
- Er zijn geen externe DML-rijen gewijzigd in het huidige vervangingsvenster. DML waarmee rijen buiten het huidige venster worden gewijzigd, wordt niet beïnvloed.
- Het huidige vervangingsvenster bevat geen rijen die door het vorige predicaat zijn uitgesloten. Als u het predicaat breder maakt om een bereik te dekken dat niet eerder is verwerkt, valt die vernieuwing terug op volledige hercomputatie. Volgende vernieuwingen komen weer in aanmerking voor incrementele vernieuwing.
- Het predicaat is deterministisch. Predicaten die niet-deterministische functies gebruiken, zoals
rand(), schakelen het incrementeel vernieuwen uit. Tijdelijke functies zoalscurrent_date()zijn toegestaan.
De eerste verversing van een flow is altijd een volledige berekening. Als niet aan een voorwaarde is voldaan, valt die verversing terug op een volledige herberekening van het huidige vervangingsvenster.
Aanbevolen procedures voor incrementeel vernieuwen
Volg deze richtlijnen zodat REPLACE-WHEREwerkstromen in aanmerking blijven komen voor stapsgewijs vernieuwen.
Een bewegende ondergrens gebruiken
Predicaten met een bewegende ondergrens blijven voor onbepaalde tijd in aanmerking komen voor incrementeel vernieuwen.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Een bewegende bovengrens, zoals date BETWEEN date_add(current_date(), -7) AND current_date(), kan het venster verschuiven om eerder uitgesloten rijen op te nemen, waardoor een eenmalige terugval naar volledige hercomputatie wordt geactiveerd.
Neem de predicaatkolom op in GROUP BY
Neem bij het aggregeren de predicaatkolom op in GROUP BY, zodat de engine het predicaat onder de aggregatie kan verplaatsen.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Als de predicaatkolom ontbreekt in GROUP BY, kan het predicaat niet onder de aggregatie worden doorgeschoven en wordt de bron volledig gescand.
De predicaatkolom opnemen in sleutels voor joins
Neem de predicaatkolom op in de joinvoorwaarde, zodat de engine alle gekoppelde bronnen kan verwijderen.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Als een gekoppelde tabel de predicaatkolom niet beschikbaar maakt, wordt die tabel bij elke vernieuwing volledig gescand.
Diagnosticeer terugval naar volledige herberekening
Wanneer een verversing terugvalt op een volledige herberekening, wordt de reden vermeld in de gebeurtenis planning_information van de flow. Zie Gebeurtenislogboeken van de pijplijn controleren. De volgende tabel bevat de redenen die in de gebeurtenis zijn gerapporteerd:
| Reason | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Een externe DML heeft rijen gewijzigd in het huidige vervangingsvenster. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Het predicaat maakt gebruik van niet-deterministische expressies. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
De vorige vernieuwing heeft een niet-deterministisch predicaat gebruikt. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Het predicaat kan niet naar een bron worden gepusht, het huidige venster bevat rijen die niet worden verwerkt door het vorige predicaat of de uitvoering maakt gebruik van een predicaatoverschrijving. |
Examples
In de volgende voorbeelden ziet u veelvoorkomende REPLACE-stroompatronen WHERE .
Voorbeeld 1: Historische aggregaties van een bron met beperkte retentie behouden
In dit voorbeeld worden dagelijkse aggregaties voor onbepaalde tijd bijgehouden, zelfs nadat onbewerkte gegevens uit de brontabel zijn verouderd (retentie van drie dagen):
CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Voorbeeld 2: Hercomputatie voorkomen wanneer een dimensietabel wordt gewijzigd
In dit voorbeeld blijven historische feitenrijen ongewijzigd wanneer dimensiekenmerken veranderen:
CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Als de regio van een gebruiker verandert, worden alleen recente rijen opnieuw gecomputeerd. Historische rijen behouden de regiowaarde op het moment dat ze zijn geschreven.
Voorbeeld 3: Een nieuwe metriek toevoegen zonder de volledige geschiedenis opnieuw te compileren
In dit voorbeeld ziet u hoe u een tabeldefinitie ontwikkelt en alleen een doelbereik bijvult:
Definieer de eerste tabel:
CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Werk de query bij om toe te voegen
uniq_users:CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Rijen ouder dan het venster van 7 dagen bevatten
NULLvooruniq_users.
Voorbeeld 4: Itereren op een klein venster voordat de volledige historische gegevens worden aangevuld
In dit voorbeeld ziet u hoe u querylogica in een klein gegevensvenster valideert voordat u het volledige historische bereik verwerkt.
Begin met een kort venster om metrische gegevens te valideren en bedrijfslogica te herhalen met lagere rekenkosten:
CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Bij een kort tijdvenster worden bij elke verversing alleen de laatste zeven dagen opnieuw berekend, dus pas de query zo vaak als nodig aan voordat u een volledige historische run uitvoert.
Zodra de query is voltooid, gebruikt u DML om het volledige historische bereik te vullen:
INSERT INTO revenue_attribution
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;