Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Important
Les flux REPLACE WHERE pour les tables de diffusion en continu autonomes sont en version bêta.
Cette page explique comment utiliser des flux REPLACE WHERE pour recompiler et remplacer un sous-ensemble ciblé d’une table de streaming autonome sans retraiter l’historique de votre table entière. Les flux REPLACE WHERE gèrent les données arrivant en retard, le retraitement en amont, l’évolution du schéma et les remplissages.
Avec un flux REPLACE WHERE , vous définissez un prédicat sur la table cible. Toutes les lignes correspondant au prédicat sont supprimées et remplacées par la réévaluation de la requête source pour cette même plage de prédicats. Les lignes qui ne correspondent pas au prédicat sont laissées intactes.
Requirements
Les flux REPLACE WHERE ont les exigences suivantes :
- Votre table de streaming doit utiliser le canal
PREVIEW. Consultezchannelles configurations de pipeline. - Databricks recommande le catalogue Unity et le calcul serverless. L’actualisation incrémentielle est prise en charge uniquement sur le calcul serverless.
Quand utiliser des flux REPLACE WHERE
Utilisez les flux REPLACE WHERE pour les scénarios suivants :
- Traitement incrémentiel par lots sans sémantique de streaming : traiter de nouvelles lignes par lots sans gérer des concepts liés au streaming tels que les watermarks.
- Retraitement sélectif : recompilez uniquement les lignes qui correspondent à un prédicat tout en laissant toutes les autres lignes intactes.
-
Scénarios au-delà des fonctionnalités de vue matérialisées standard :
- Tables cibles avec une durée de conservation plus longue que la source
- Empêcher la recomputation lorsqu’une table de dimension change
- Évolution du schéma sans recomputer l’historique entier
Créer un flux REPLACE WHERE
Utilisez la FLOW REPLACE WHERE clause inline avec CREATE OR REFRESH STREAMING TABLE:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.channel = 'PREVIEW')
SCHEDULE EVERY 1 DAY
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT
o.order_id,
o.date,
o.region,
p.product_name,
o.qty,
o.price
FROM orders_fct o
JOIN product_dim p
ON o.product_id = p.product_id;
Pendant l’actualisation, toutes les lignes de la table cible qui correspondent au prédicat sont supprimées, la requête source est recomputée pour cette même plage de prédicats et les nouveaux résultats sont insérés. Dans cet exemple, toutes les lignes des 7 derniers jours sont supprimées de orders_enriched et recalculées à partir de la requête source.
Vous n’avez pas besoin d’ajouter le prédicat à la requête source. Le moteur du pipeline l’applique automatiquement lors de la lecture depuis la source.
Note
BY NAME est obligatoire. Elle garantit que les colonnes sont mises en correspondance par nom plutôt que par position.
Remplissage des données historiques
Pour effectuer des rechargements, exécutez des instructions DML directement sur la table cible :
INSERT INTO orders_enriched
SELECT *
FROM orders_enriched_legacy
WHERE date < '2025-01-01';
Comportement d’actualisation complète
Une actualisation complète d’un flux REPLACE WHERE réexécute la requête source en utilisant uniquement le prédicat actuel. Les lignes insérées par des instructions DML en dehors de la plage de prédicats actuelles sont définitivement supprimées.
Avertissement
Une actualisation complète efface toutes les données existantes et réexécute le flux à l’aide de son prédicat défini uniquement. Si un pipeline est en cours d’exécution depuis un an avec un prédicat de 7 jours, une actualisation complète a pour résultat que la table ne contient plus que les données des 7 derniers jours. Toutes les lignes plus anciennes sont définitivement supprimées.
REFRESH STREAMING TABLE orders_enriched FULL;
Pour empêcher les actualisations complètes sur une table, définissez la propriété pipelines.reset.allowed de table sur false:
CREATE OR REFRESH STREAMING TABLE orders_enriched
TBLPROPERTIES (pipelines.reset.allowed = 'false')
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
...
Actualisation incrémentielle
Les flux REPLACE WHERE utilisent l’actualisation incrémentielle si possible, en retraiteant uniquement les données sources qui ont changé depuis la dernière actualisation plutôt que de recomputer la fenêtre de remplacement entière. L’actualisation incrémentielle nécessite une capacité de calcul serverless.
Lorsque l’actualisation incrémentielle s’applique
Tous les éléments suivants doivent être vrais :
- Le pipeline s’exécute sur une infrastructure d’informatique sans serveur.
- La forme de requête est prise en charge. Consultez l’actualisation incrémentielle pour la liste des opérateurs pris en charge.
- Le prédicat fait référence aux colonnes de base d’une table source. Les prédicats sur les valeurs dérivées, comme les résultats de fonctions d’agrégation ou de fenêtre, ne peuvent pas être transmis à une source, ce qui désactive l’actualisation incrémentielle.
- Aucune DML externe n’a modifié les lignes dans la fenêtre de remplacement actuelle. DML qui modifie les lignes en dehors de la fenêtre active n’est pas affecté.
- La fenêtre de remplacement actuelle n’inclut pas les lignes exclues par le prédicat précédent. Si vous élargissez le prédicat pour couvrir une plage non traitée précédemment, cette actualisation revient à une recomputation complète. Les actualisations suivantes sont à nouveau éligibles pour l’actualisation incrémentielle.
- Le prédicat est déterministe. Prédicats utilisant des fonctions non déterministes telles que
rand()désactiver l’actualisation incrémentielle. Les fonctions temporelles telles quecurrent_date()sont autorisées.
La première actualisation d’un flux est toujours un calcul complet. Si aucune condition n’est remplie, cette actualisation revient à une recomputation complète de la fenêtre de remplacement actuelle.
Meilleures pratiques pour l’actualisation incrémentielle
Suivez ces instructions afin que les flux REPLACE WHERE restent éligibles pour l’actualisation incrémentielle.
Utiliser une limite inférieure mobile
Les prédicats avec une limite inférieure mobile restent éligibles pour l’actualisation incrémentielle indéfiniment.
FLOW REPLACE WHERE date >= date_add(current_date(), -7)
Une limite supérieure mobile, telle que date BETWEEN date_add(current_date(), -7) AND current_date(), peut déplacer la fenêtre de manière à inclure des lignes auparavant exclues, entraînant un recours ponctuel à un recalcul complet.
Inclure la colonne de prédicat dans GROUP BY
Lors de l’agrégation, incluez la colonne de prédicat dans GROUP BY afin que le moteur puisse pousser le prédicat en dessous de l’agrégation.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT date, region, SUM(amount) AS total
FROM sales
GROUP BY date, region;
Si la colonne du prédicat est absente dans GROUP BY, le prédicat ne peut pas être repoussé sous l’agrégation et la source est parcourue entièrement.
Inclure la colonne de prédicat dans les clés de jointure
Incluez la colonne de prédicat dans la condition de jointure afin que le moteur puisse découper toutes les sources jointes.
FLOW REPLACE WHERE date >= date_add(current_date(), -7) BY NAME
SELECT f.date, f.user_id, d.region, f.revenue
FROM fact f
JOIN dim d ON f.date = d.date AND f.user_id = d.user_id;
Si une table jointe n’expose pas la colonne de prédicat, cette table est analysée en intégralité à chaque actualisation.
Diagnostiquer le retour à la recomputation complète
Lorsqu’une actualisation revient à une recomputation complète, la raison est signalée dans l’événement planning_information du flux. Consultez Consulter les journaux des événements du pipeline. Le tableau suivant répertorie les raisons signalées dans l’événement :
| Raison | Meaning |
|---|---|
EXTERNAL_CHANGE_IN_REPLACE_WINDOW |
Une instruction DML externe a modifié des lignes dans la fenêtre de remplacement actuelle. |
REPLACE_WHERE_NOT_DETERMINISTIC |
Le prédicat utilise des expressions non déterministes. |
PRIOR_REPLACE_WHERE_NOT_DETERMINISTIC |
L’actualisation précédente a utilisé un prédicat non déterministe. |
UNSUPPORTED_REPLACE_WHERE_PREDICATE |
Le prédicat ne peut être appliqué à aucune source, la fenêtre actuelle inclut des lignes non traitées par le prédicat précédent, ou l’exécution utilise une substitution de prédicat. |
Exemples
Les exemples suivants montrent des modèles de flux REPLACE WHERE courants.
Exemple 1 : Conserver les agrégats historiques d’une source à durée de rétention limitée
Cet exemple conserve indéfiniment les agrégats quotidiens, même après l’expiration des données brutes dans la table source (rétention de 3 jours) :
CREATE OR REFRESH STREAMING TABLE events_agg
FLOW REPLACE WHERE date >= date_add(current_date(), -3) BY NAME
SELECT
date,
key,
SUM(val) AS agg
FROM events_raw
GROUP BY ALL;
Exemple 2 : Empêcher la recomputation lorsqu’une table de dimension change
Cet exemple montre comment conserver les lignes de faits historiques inchangées lorsque les attributs de dimension changent :
CREATE OR REFRESH STREAMING TABLE fact_dim_join
FLOW REPLACE WHERE f.date >= date_add(current_date(), -1) BY NAME
SELECT
f.date,
f.user_id,
d.region,
f.revenue
FROM fact_table f
JOIN dim_users d
ON f.user_id = d.user_id;
Si la région d’un utilisateur change, seules les lignes récentes sont recomputées. Les lignes historiques conservent la valeur de région au moment où elles ont été écrites.
Exemple 3 : Ajouter une nouvelle métrique sans recomputer l’historique complet
Cet exemple montre comment faire évoluer une définition de table et remplir uniquement une plage ciblée :
Définissez la table initiale :
CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks FROM clickstream_raw GROUP BY ALL;Mettez à jour la requête pour ajouter
uniq_users:CREATE OR REFRESH STREAMING TABLE clickstream_daily FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME SELECT event_date, page_id, COUNT(*) AS clicks, COUNT(DISTINCT user_id) AS uniq_users FROM clickstream_raw GROUP BY ALL;Les lignes antérieures à la fenêtre de 7 jours contiennent
NULLpouruniq_users.
Exemple 4 : Itérer sur une petite fenêtre avant de remplir l’historique complet
Cet exemple montre comment valider la logique de requête sur une petite fenêtre de données avant de traiter la plage historique complète.
Commencez par une courte fenêtre pour valider les métriques et itérer sur la logique métier avec des coûts de calcul inférieurs :
CREATE OR REFRESH STREAMING TABLE revenue_attribution
FLOW REPLACE WHERE event_date >= date_add(current_date(), -7) BY NAME
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
GROUP BY ALL;
Une courte fenêtre recompute uniquement les 7 derniers jours à chaque actualisation, donc révisez la requête autant de fois que nécessaire avant de valider une exécution historique complète.
Une fois la requête finalisée, utilisez DML pour renvoyer la plage d’historique complète :
INSERT INTO revenue_attribution
SELECT
event_date,
campaign_id,
SUM(revenue) AS total_revenue
FROM marketing_events
WHERE event_date < date_add(current_date(), -7)
GROUP BY ALL;