Notes
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Une requête de Structured Streaming avec état nécessite des mises à jour incrémentielles d’informations d’état intermédiaires, tandis qu’une requête de Structured Streaming sans état effectue uniquement le suivi des informations sur les lignes traitées entre la source et le récepteur.
Les opérations avec état incluent l'agrégation en continu, le streaming dropDuplicates
, les jointures entre flux et les applications avec état personnalisées.
Les informations d’état intermédiaires requises pour les requêtes Structured Streaming avec état peuvent entraîner des problèmes de latence et de production inattendus s’ils sont mal configurés.
Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez activer le point de contrôle du journal des modifications avec RocksDB pour réduire la durée des points de contrôle et la latence de bout en bout pour les charges de travail de flux structuré. Databricks recommande d’activer le point de contrôle du journal des modifications pour toutes les requêtes avec état Flux structuré. Voir Activer le point de contrôle du journal des modifications.
Optimiser des requêtes Structured Streaming avec état
La gestion des informations d’état intermédiaire des requêtes de Structured Streaming avec état peut aider à éviter des problèmes inattendus de latence et de production.
Les recommandations de Databricks sont les suivantes :
- Utilisez des instances optimisées pour le calcul en tant que workers.
- Définissez le nombre de partitions de répartition entre 1 et 2 fois le nombre de cœurs dans le cluster.
- Définissez la configuration
spark.sql.streaming.noDataMicroBatches.enabled
surfalse
dans la SparkSession. Cela empêche le moteur de traitement par micro-lots de diffusion en continu de traiter des micro-lots qui ne contiennent pas de données. Notez également que la configuration de cette option surfalse
peut entraîner des opérations avec état qui utilisent des filigranes ou des délais d'attente de traitement pour retarder la sortie des données jusqu'à ce que de nouvelles données arrivent, au lieu d'être immédiate.
Databricks recommande l’utilisation de RocksDB avec les points de contrôle du journal des modifications pour gérer l’état des flux avec état. Consultez Configurer un stockage d’état RocksDB sur Azure Databricks.
Remarque
Le schéma de gestion d’état ne peut pas être modifié entre les redémarrages de la requête. Si une requête a été démarrée avec la gestion par défaut, vous devez la reprendre depuis le début avec un nouvel emplacement de point de contrôle pour modifier le stockage d'état.
Utiliser plusieurs opérateurs avec état dans Structured Streaming
Dans Databricks Runtime 13.3 LTS et versions ultérieures, Azure Databricks offre une prise en charge avancée des opérateurs avec état dans les charges de travail Flux structuré. Vous pouvez désormais chaîner plusieurs opérations avec état, ce qui signifie que vous pouvez alimenter la sortie d’une opération, telle qu’une agrégation fenêtrée, vers une autre opération avec état, telle qu’une jointure.
Dans Databricks Runtime 16.2 et versions ultérieures, vous pouvez utiliser transformWithState
dans les charges de travail avec des opérateurs d'état multiples. Consultez Générer une application avec état personnalisé.
Les exemples suivants illustrent plusieurs modèles que vous pouvez utiliser.
Important
Les limitations suivantes existent lors de l’utilisation de plusieurs opérateurs avec état :
- Les opérateurs avec état personnalisés hérités (
FlatMapGroupWithState
etapplyInPandasWithState
ne sont pas pris en charge). - Seul le mode de sortie d’ajout est pris en charge.
Agrégation de fenêtres de temps chaînées
Python
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String }
# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
window(words.timestamp, "10 minutes", "5 minutes"),
words.word
).count()
# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
window(window_time(windowedCounts.window), "1 hour"),
windowedCounts.word
).count()
Langage de programmation Scala
import spark.implicits._
val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
window($"timestamp", "10 minutes", "5 minutes"),
$"word"
).count()
// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
window($"window", "1 hour"),
$"word"
).count()
Agrégation de fenêtres de temps dans deux flux différents suivis d’une jointure de fenêtre flux-flux
Python
clicksWindow = clicksWithWatermark.groupBy(
clicksWithWatermark.clickAdId,
window(clicksWithWatermark.clickTime, "1 hour")
).count()
impressionsWindow = impressionsWithWatermark.groupBy(
impressionsWithWatermark.impressionAdId,
window(impressionsWithWatermark.impressionTime, "1 hour")
).count()
clicksWindow.join(impressionsWindow, "window", "inner")
Langage de programmation Scala
val clicksWindow = clicksWithWatermark
.groupBy(window("clickTime", "1 hour"))
.count()
val impressionsWindow = impressionsWithWatermark
.groupBy(window("impressionTime", "1 hour"))
.count()
clicksWindow.join(impressionsWindow, "window", "inner")
Jointure de l’intervalle de temps flux-flux suivie de l’agrégation de fenêtre de temps
Python
joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
"leftOuter" # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined.groupBy(
joined.clickAdId,
window(joined.clickTime, "1 hour")
).count()
Langage de programmation Scala
val joined = impressionsWithWatermark.join(
clicksWithWatermark,
expr("""
clickAdId = impressionAdId AND
clickTime >= impressionTime AND
clickTime <= impressionTime + interval 1 hour
"""),
joinType = "leftOuter" // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)
joined
.groupBy($"clickAdId", window($"clickTime", "1 hour"))
.count()
Rééquilibrage d’état pour le flux structuré
Le rééquilibrage d’état est activé par défaut pour toutes les charges de travail de streaming dans Lakeflow Declarative Pipelines. Dans Databricks Runtime 11.3 LTS et versions ultérieures, vous pouvez définir l’option de configuration suivante dans la configuration du cluster Spark pour activer le rééquilibrage de l’état :
spark.sql.streaming.statefulOperator.stateRebalancing.enabled true
Le rééquilibrage d’état bénéficie aux pipelines Structured Streaming avec état qui connaissent des événements de redimensionnement de cluster. Les opérations de streaming sans état n’en bénéficient pas, même si la taille de cluster varie.
Remarque
La mise à l’échelle automatique du calcul présente des limitations pour la réduction de la taille du cluster pour les charges de travail Structured Streaming. Databricks recommande d’utiliser Lakeflow Declarative Pipelines avec une mise à l’échelle automatique améliorée pour les charges de travail de streaming. Voir Optimiser l’utilisation du cluster des pipelines déclaratifs Lakeflow avec la mise à l’échelle automatique.
Le redimensionnement du cluster déclenche le rééquilibrage de l’état. Les micro-lots peuvent avoir une latence plus élevée lors des événements de rééquilibrage, lorsque l’état passe du stockage cloud aux nouveaux exécuteurs.