Partager via


Qu’est-ce que la diffusion en continu avec état ?

Une requête de Structured Streaming avec état nécessite des mises à jour incrémentielles d’informations d’état intermédiaires, tandis qu’une requête de Structured Streaming sans état effectue uniquement le suivi des informations sur les lignes traitées entre la source et le récepteur.

Les opérations avec état incluent l'agrégation en continu, le streaming dropDuplicates, les jointures entre flux et les applications avec état personnalisées.

Les informations d’état intermédiaires requises pour les requêtes Structured Streaming avec état peuvent entraîner des problèmes de latence et de production inattendus s’ils sont mal configurés.

Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez activer le point de contrôle du journal des modifications avec RocksDB pour réduire la durée des points de contrôle et la latence de bout en bout pour les charges de travail de flux structuré. Databricks recommande d’activer le point de contrôle du journal des modifications pour toutes les requêtes avec état Flux structuré. Voir Activer le point de contrôle du journal des modifications.

Optimiser des requêtes Structured Streaming avec état

La gestion des informations d’état intermédiaire des requêtes de Structured Streaming avec état peut aider à éviter des problèmes inattendus de latence et de production.

Les recommandations de Databricks sont les suivantes :

  • Utilisez des instances optimisées pour le calcul en tant que workers.
  • Définissez le nombre de partitions de répartition entre 1 et 2 fois le nombre de cœurs dans le cluster.
  • Définissez la configuration spark.sql.streaming.noDataMicroBatches.enabled sur false dans la SparkSession. Cela empêche le moteur de traitement par micro-lots de diffusion en continu de traiter des micro-lots qui ne contiennent pas de données. Notez également que la configuration de cette option sur false peut entraîner des opérations avec état qui utilisent des filigranes ou des délais d'attente de traitement pour retarder la sortie des données jusqu'à ce que de nouvelles données arrivent, au lieu d'être immédiate.

Databricks recommande l’utilisation de RocksDB avec les points de contrôle du journal des modifications pour gérer l’état des flux avec état. Consultez Configurer un stockage d’état RocksDB sur Azure Databricks.

Remarque

Le schéma de gestion d’état ne peut pas être modifié entre les redémarrages de la requête. Si une requête a été démarrée avec la gestion par défaut, vous devez la reprendre depuis le début avec un nouvel emplacement de point de contrôle pour modifier le stockage d'état.

Utiliser plusieurs opérateurs avec état dans Structured Streaming

Dans Databricks Runtime 13.3 LTS et versions ultérieures, Azure Databricks offre une prise en charge avancée des opérateurs avec état dans les charges de travail Flux structuré. Vous pouvez désormais chaîner plusieurs opérations avec état, ce qui signifie que vous pouvez alimenter la sortie d’une opération, telle qu’une agrégation fenêtrée, vers une autre opération avec état, telle qu’une jointure.

Dans Databricks Runtime 16.2 et versions ultérieures, vous pouvez utiliser transformWithState dans les charges de travail avec des opérateurs d'état multiples. Consultez Générer une application avec état personnalisé.

Les exemples suivants illustrent plusieurs modèles que vous pouvez utiliser.

Important

Les limitations suivantes existent lors de l’utilisation de plusieurs opérateurs avec état :

  • Les opérateurs avec état personnalisés hérités (FlatMapGroupWithState et applyInPandasWithState ne sont pas pris en charge).
  • Seul le mode de sortie d’ajout est pris en charge.

Agrégation de fenêtres de temps chaînées

Python

words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, "10 minutes", "5 minutes"),
    words.word
).count()

# Group the windowed data by another window and word and compute the count of each group
anotherWindowedCounts = windowedCounts.groupBy(
    window(window_time(windowedCounts.window), "1 hour"),
    windowedCounts.word
).count()

Langage de programmation Scala

import spark.implicits._

val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", "10 minutes", "5 minutes"),
  $"word"
).count()

// Group the windowed data by another window and word and compute the count of each group
val anotherWindowedCounts = windowedCounts.groupBy(
  window($"window", "1 hour"),
  $"word"
).count()

Agrégation de fenêtres de temps dans deux flux différents suivis d’une jointure de fenêtre flux-flux

Python

clicksWindow = clicksWithWatermark.groupBy(
  clicksWithWatermark.clickAdId,
  window(clicksWithWatermark.clickTime, "1 hour")
).count()

impressionsWindow = impressionsWithWatermark.groupBy(
  impressionsWithWatermark.impressionAdId,
  window(impressionsWithWatermark.impressionTime, "1 hour")
).count()

clicksWindow.join(impressionsWindow, "window", "inner")

Langage de programmation Scala

val clicksWindow = clicksWithWatermark
  .groupBy(window("clickTime", "1 hour"))
  .count()

val impressionsWindow = impressionsWithWatermark
  .groupBy(window("impressionTime", "1 hour"))
  .count()

clicksWindow.join(impressionsWindow, "window", "inner")

Jointure de l’intervalle de temps flux-flux suivie de l’agrégation de fenêtre de temps

Python

joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
    """),
  "leftOuter"                 # can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined.groupBy(
  joined.clickAdId,
  window(joined.clickTime, "1 hour")
).count()

Langage de programmation Scala

val joined = impressionsWithWatermark.join(
  clicksWithWatermark,
  expr("""
    clickAdId = impressionAdId AND
    clickTime >= impressionTime AND
    clickTime <= impressionTime + interval 1 hour
  """),
  joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter", "fullOuter", "leftSemi"
)

joined
  .groupBy($"clickAdId", window($"clickTime", "1 hour"))
  .count()

Rééquilibrage d’état pour le flux structuré

Le rééquilibrage d’état est activé par défaut pour toutes les charges de travail de streaming dans Lakeflow Declarative Pipelines. Dans Databricks Runtime 11.3 LTS et versions ultérieures, vous pouvez définir l’option de configuration suivante dans la configuration du cluster Spark pour activer le rééquilibrage de l’état :

spark.sql.streaming.statefulOperator.stateRebalancing.enabled true

Le rééquilibrage d’état bénéficie aux pipelines Structured Streaming avec état qui connaissent des événements de redimensionnement de cluster. Les opérations de streaming sans état n’en bénéficient pas, même si la taille de cluster varie.

Remarque

La mise à l’échelle automatique du calcul présente des limitations pour la réduction de la taille du cluster pour les charges de travail Structured Streaming. Databricks recommande d’utiliser Lakeflow Declarative Pipelines avec une mise à l’échelle automatique améliorée pour les charges de travail de streaming. Voir Optimiser l’utilisation du cluster des pipelines déclaratifs Lakeflow avec la mise à l’échelle automatique.

Le redimensionnement du cluster déclenche le rééquilibrage de l’état. Les micro-lots peuvent avoir une latence plus élevée lors des événements de rééquilibrage, lorsque l’état passe du stockage cloud aux nouveaux exécuteurs.