Note
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de changer d’annuaire.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de changer d’annuaire.
Cette page explique les requêtes de streaming structuré avec état, notamment les opérations avec état, les recommandations pour l'optimisation, le chaînage de plusieurs opérateurs avec état et le rééquilibrage de l'état.
Une requête de Structured Streaming avec état nécessite des mises à jour incrémentielles d’informations d’état intermédiaires, tandis qu’une requête de Structured Streaming sans état effectue uniquement le suivi des informations sur les lignes traitées entre la source et le récepteur. Pour connaître les fonctionnalités d’optimisation disponibles pour les requêtes sans état, consultez Optimiser les requêtes de diffusion en continu sans état.
Opérations avec état
Les opérations avec état incluent l’agrégation en continu, distinct, les jointures flux-à-flux, et les applications avec état personnalisées.
Les informations d’état intermédiaires requises pour les requêtes Structured Streaming avec état peuvent entraîner des problèmes de latence et de production inattendus s’ils sont mal configurés.
Dans Databricks Runtime 13.3 LTS ou version ultérieure, vous pouvez activer la journalisation des points de contrôle avec RocksDB pour réduire la durée des points de contrôle et la latence "end-to-end" pour les charges de travail Structured Streaming. Databricks recommande d’activer le point de contrôle du journal des modifications pour toutes les requêtes avec état Flux structuré. Voir Activer le point de contrôle du journal des modifications.
Optimiser les requêtes en streaming structurées avec état
Databricks recommande ce qui suit pour les requêtes Structured Streaming avec état :
- Utilisez des instances optimisées pour le calcul en tant que workers.
- Définissez le nombre de partitions de répartition entre 1 et 2 fois le nombre de cœurs dans le cluster.
- Définissez la configuration
spark.sql.streaming.noDataMicroBatches.enabledsurfalsedans la SparkSession. Cela empêche le moteur de traitement de micro-lots en continu de traiter des micro-lots sans données. La définition de cette configurationfalsepourrait également entraîner des opérations avec état qui utilisent des filigranes ou des délais d'attente de traitement et ne produisent pas de sortie de données jusqu'à ce que de nouvelles données arrivent, plutôt qu'immédiatement.
Databricks recommande l’utilisation de RocksDB avec les points de contrôle du journal des modifications pour gérer l’état des flux. Consultez Configurer un stockage d’état RocksDB sur Azure Databricks.
Remarque
Le schéma de gestion d’état ne peut pas être modifié entre les redémarrages de la requête. Si une requête a été démarrée avec la gestion par défaut, vous devez la reprendre depuis le début avec un nouvel emplacement de point de contrôle pour modifier le stockage d'état.
Utiliser plusieurs opérateurs à état dans Structured Streaming
Dans Databricks Runtime 13.3 LTS ou version ultérieure, Azure Databricks offre une prise en charge avancée des opérateurs avec état dans les charges de travail Structured Streaming. Vous pouvez enchaîner plusieurs opérateurs avec état, ce qui signifie que vous pouvez alimenter la sortie d'une opération, telle qu'une agrégation fenêtrée, vers une autre opération avec état, telle qu'une jointure.
Dans Databricks Runtime 16.2 ou version ultérieure, vous pouvez utiliser transformWithState dans les charges de travail avec plusieurs opérateurs avec état. Consultez Générer une application avec état personnalisé.
Les exemples suivants illustrent plusieurs modèles que vous pouvez utiliser.
Important
Les limitations suivantes existent lors de l’utilisation de plusieurs opérateurs avec état :
- Les opérateurs avec état personnalisés hérités (
FlatMapGroupWithStateetapplyInPandasWithState) ne sont pas pris en charge. - Seul le mode d'ajout en sortie est pris en charge.
Agrégation de fenêtres de temps chaînées
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Langage de programmation Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Agrégation de fenêtres de temps dans deux flux différents suivie d'une jointure de fenêtres flux-flux
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Langage de programmation Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Jointure de l’intervalle temporel flux-flux suivie de l’agrégation par fenêtre temporelle
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Langage de programmation Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Rééquilibrage d'état pour Structured Streaming
Le rééquilibrage d’état est activé par défaut pour toutes les charges de travail de streaming dans les pipelines déclaratifs Spark Lakeflow. Dans Databricks Runtime 11.3 LTS ou version ultérieure, vous pouvez définir l’option de configuration suivante dans la configuration du cluster Spark pour activer le rééquilibrage d’état :
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
Le rééquilibrage d’état bénéficie aux pipelines Structured Streaming avec état qui connaissent des événements de redimensionnement de cluster. Les opérations de streaming sans état ne bénéficient pas, quelle que soit la taille du cluster.
Remarque
La mise à l’échelle automatique du calcul présente des limitations pour réduire la taille du cluster pour les charges de travail Structured Streaming. Databricks recommande d’utiliser des pipelines déclaratifs Spark Lakeflow avec une mise à l’échelle automatique améliorée pour les charges de travail de streaming. Consultez Optimiser l’utilisation du cluster des pipelines déclaratifs Spark Lakeflow avec mise à l’échelle automatique.
Le redimensionnement du cluster déclenche le rééquilibrage de l’état. Les micro-lots peuvent avoir une latence plus élevée lors des événements de rééquilibrage, lorsque l’état passe du stockage cloud aux nouveaux exécuteurs.