Note
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de changer d’annuaire.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de changer d’annuaire.
Cette page décrit les concepts de base du filigrane et fournit des recommandations pour l’utilisation de filigranes dans des opérations courantes de diffusion en continu avec état. Vous devez appliquer des filigranes aux opérations de streaming avec état pour éviter d’augmenter infiniment la quantité de données conservées dans l’état, ce qui peut introduire des problèmes de mémoire ou augmenter les latences de traitement pendant les opérations de streaming de longue durée.
Qu’est-ce qu’un filigrane ?
Structured Streaming utilise des filigranes pour contrôler le seuil de durée pour continuer le traitement des mises à jour d'une entité d'état donnée. Voici des exemples courants d’entités d’état :
- Agrégations sur une fenêtre de temps.
- Clés uniques dans une jointure entre deux flux.
Lorsque vous définissez un filigrane, vous spécifiez un champ d’horodatage et un seuil de filigrane sur un DataFrame en flux. À mesure que de nouvelles données arrivent, le gestionnaire d'état suit l'horodatage le plus récent dans le champ spécifié et traite tous les enregistrements dans le seuil de latence.
L'exemple suivant applique un seuil de watermark de 10 minutes à un comptage basé sur des fenêtres de temps.
from pyspark.sql.functions import window
(df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
)
Dans cet exemple :
- La colonne
event_timeest utilisée pour définir un marqueur temporel de 10 minutes et une fenêtre déroulante de 5 minutes. - Un décompte est effectué pour chaque
idobservé, pour chaque fenêtre de 5 minutes qui ne se chevauchent pas. - Les informations d’état sont conservées pour chaque nombre jusqu’à ce que la fin de la fenêtre soit 10 minutes plus ancienne que le plus récent élément observé
event_time.
Important
Les seuils de filigrane veillent à ce des enregistrements arrivant dans le seuil spécifié sont traités en fonction de la sémantique de la requête définie. Les enregistrements arrivant en retard en dehors du seuil spécifié peuvent toujours être traités à l’aide de métriques de requête, mais cela n’est pas garanti.
Comment les filigranes ont-ils un impact sur le temps de traitement et le débit ?
Les filigranes interagissent avec les modes de sortie pour contrôler le moment où les données sont écrites dans le récepteur. Étant donné que les filigranes réduisent la quantité totale d'informations d'état à traiter, une utilisation optimale des filigranes est essentielle pour optimiser le débit de traitement dans le streaming à état conservé.
Note
Pas tous les modes de sortie ne sont pris en charge pour toutes les opérations nécessitant un état.
Filigranes et mode de sortie pour les agrégations par fenêtre
Le tableau suivant détaille le traitement des requêtes avec agrégation sur un horodatage marqué d'une estampille spécifiée :
| Mode de sortie | Comportement |
|---|---|
| Ajouter | Les lignes sont écrites dans la table cible une fois le seuil de filigrane passé. Toutes les écritures sont retardées en fonction du seuil de retard. L’ancien état d’agrégation est supprimé une fois le seuil passé. |
| Mise à jour | Les lignes sont écrites dans la table cible à mesure que les résultats sont calculés et peuvent être mises à jour et remplacées à mesure que de nouvelles données arrivent. L’ancien état d’agrégation est supprimé une fois le seuil passé. |
| Terminé | L’état d’agrégation n’est pas supprimé. La table cible est réécrite à chaque déclenchement. |
Filigranes et sortie pour les jointures de flux à flux
Les jointures entre plusieurs flux prennent uniquement en charge le mode d’ajout, et les enregistrements correspondants sont écrits dans chaque lot qu’ils sont découverts. Pour les jointures internes, Databricks recommande de définir un seuil de filigrane sur chaque source de données de streaming. Cela permet d’ignorer les informations d’état pour les anciens enregistrements. Quand il n'y a pas de filigranes, le Streaming structuré cherche à joindre chaque clé de chaque côté de la jointure à chaque déclencheur.
Streaming structuré possède une sémantique spéciale pour prendre en charge les jointures externes. Le marquage est obligatoire pour les jointures externes, car il indique quand une clé doit être écrite avec une valeur null après ne pas avoir trouvé de correspondance. Bien que les jointures externes puissent être utiles pour l’enregistrement des enregistrements qui ne sont jamais mis en correspondance pendant le traitement des données, car les jointures écrivent uniquement dans des tables sous forme d'opérations d'ajout, ces données manquantes ne sont pas enregistrées tant que le seuil de latence n'est pas passé.
Contrôler le seuil des données tardives avec une politique de filigrane multiple dans le streaming structuré
Lorsque vous utilisez plusieurs entrées de Structured Streaming, vous pouvez définir plusieurs limites afin de contrôler les seuils de tolérance pour les données tardives. La configuration des watermarks vous permet de contrôler les informations d'état et a une incidence sur la latence.
Une requête de diffusion en continu peut avoir plusieurs flux d’entrée réunis ou joints. Chacun des flux d’entrée peut avoir un seuil distinct de données tardives, qui doit être toléré pour les opérations avec état. Spécifiez ces seuils à l’aide de withWatermarks("eventTime", delay) sur chacun des flux d’entrée. Voici un exemple de requête avec des jointures de flux à flux.
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
Lors de l'exécution de la requête, Structured Streaming suit individuellement l'heure maximale de l'événement observée dans chaque flux d'entrée, calcule des watermarks en fonction du retard correspondant, et choisit avec celles-ci un seul watermark global à utiliser pour les opérations avec état. Par défaut, la valeur minimale est choisie comme repère d'eau global car elle empêche que les données ne soient écartées par erreur parce qu'elles sont considérées comme arrivées trop tard si l'un des flux prend du retard par rapport aux autres (par exemple, si l'un des flux cesse de recevoir des données en raison de défaillances en amont). En d’autres termes, la limite globale se déplace en toute sécurité au rythme du flux le plus lent, et le résultat de la requête est retardé en conséquence.
Si vous souhaitez obtenir des résultats plus rapide, vous pouvez définir la stratégie de limites multiples pour choisir la valeur maximale comme limite globale en définissant la configuration SQL spark.sql.streaming.multipleWatermarkPolicy sur max (la configuration par défaut est min). Cela permet au repère global de se déplacer au rythme du flux le plus rapide. Toutefois, cette configuration a pour effet d’annuler les données des flux les plus lents. Databricks recommande d’utiliser cette configuration judicieusement.
Appliquer des filigranes à des opérations distinctes
L'opérateur distinct est un opérateur avec état qui nécessite des repères temporels pour empêcher la croissance de l'état non bornée. Sans filigranes, Structured Streaming tente de suivre indéfiniment chaque enregistrement unique, ce qui peut entraîner des problèmes de mémoire ou des latences de traitement accrues.
Lorsque vous appliquez distinct à un DataFrame de streaming, vous devez spécifier un watermark sur un champ d’horodatage. Le filigrane contrôle la durée pendant laquelle le gestionnaire d’état conserve les enregistrements pour la déduplication. Une fois le seuil de filigrane dépassé, les anciens enregistrements sont supprimés de l'état du système.
L'exemple suivant applique un filigrane à l'opération distinct :
Python
streamingDf = spark.readStream. ... # columns: eventTime, id, value, ...
# Apply watermark before distinct operation
(streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
)
Scala
val streamingDf = spark.readStream. ... // columns: eventTime, id, value, ...
// Apply watermark before distinct operation
streamingDf
.withWatermark("eventTime", "1 hour")
.distinct()
Dans cet exemple, les enregistrements en double arrivant dans les 1 heures suivant la dernière observation eventTime sont supprimés du flux. Les informations d’état pour la déduplication sont supprimées une fois le seuil dépassé.
Important
Si vous devez dédupliquer sur des colonnes spécifiques plutôt que sur toutes les colonnes, utilisez dropDuplicates() ou dropDuplicatesWithinWatermark() au lieu de distinct. Pour plus d’informations, consultez la section suivante.
Supprimer des doublons dans un filigrane
Dans Databricks Runtime 13.3 LTS ou version ultérieure, vous pouvez dédupliquer des enregistrements en respectant un seuil de filigrane à l'aide d'un identifiant unique.
Streaming structuré garantit un traitement exactement une fois, mais ne déduplique pas automatiquement les enregistrements provenant des sources de données. Vous pouvez utiliser dropDuplicatesWithinWatermark pour dédupliquer des enregistrements sur n’importe quel champ spécifié, ce qui vous permet de supprimer les doublons d’un flux, même en cas de différence de champs (telle que l’heure d’événement ou l’heure d’arrivée).
Il est garanti que les enregistrements dupliqués qui arrivent dans le délai de référence spécifié seront supprimés. Cette garantie est stricte dans une seule direction et vous pouvez également supprimer des enregistrements en double qui arrivent en dehors du seuil spécifié. Pour supprimer tous les doublons, vous devez définir le seuil de délai d’un filigrane supérieur au nombre maximal de différences d’horodatage parmi les événements dupliqués.
Vous devez spécifier un filigrane pour utiliser la méthode dropDuplicatesWithinWatermark, comme dans l’exemple suivant :
Python
streamingDf = spark.readStream. ...
# deduplicate using guid column with watermark based on eventTime column
(streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(["guid"])
)
Scala
val streamingDf = spark.readStream. ... // columns: guid, eventTime, ...
// deduplicate using guid column with watermark based on eventTime column
streamingDf
.withWatermark("eventTime", "10 hours")
.dropDuplicatesWithinWatermark(Seq("guid"))