Repartition d’état à la demande pour les requêtes de streaming avec état

Le repartitionnement d’état à la demande vous permet de redimensionner le nombre de partitions pour une requête Structured Streaming avec état sans perdre l’état du point de contrôle.

En l’absence de repartitionnement d’état à la demande, vous définissez le nombre de partitions de shuffle lors de la création du checkpoint. Si vous changez spark.sql.shuffle.partitions, les requêtes avec des points de contrôle existants ignorent la nouvelle valeur. L’application d’un nouveau nombre de partitions vous oblige à redémarrer la requête avec un nouveau point de contrôle.

Le repartitionnement d’état à la demande présente les avantages suivants :

  • Ajustez les requêtes en redimensionnant le nombre de partitions sans regénérer le point de contrôle.
  • Ajustez les requêtes à la hausse ou à la baisse en fonction de l’évolution de la charge de travail.

Requirements

Modifier le nombre de partitions

Utilisez la configuration Spark spark.sql.streaming.stateStore.partitions et redémarrez la requête pour modifier le nombre de partitions de shuffle et d’état de streaming :

Python

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()

Scala

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()

Pour les requêtes avec état, spark.sql.streaming.stateStore.partitions est prioritaire sur spark.sql.shuffle.partitions. Une fois la requête redémarrée et la dernière microbatch planifiée terminée, la requête exécute une opération de repartition pour redistribuer les données d’état dans le nouveau nombre de partitions. Une fois l’opération de repartition terminée, la requête reprend le traitement.

État de repartition du moniteur

Une fois le microbatch suivant terminé, StreamingQueryProgress les événements incluent la durée de l’opération de repartition. Dans les métriques d’un durationMs événement, controlBatch.REPARTITION affiche la valeur de durée en millisecondes. Des tailles d’état plus importantes peuvent augmenter le temps nécessaire au repartitionnement. Consultez la surveillance des requêtes de streaming structuré sur Azure Databricks.

Example

L’exemple suivant réduit le nombre de partitions de shuffle d’une requête de 200, la valeur par défaut, à 100. Arrêtez la requête, définissez le nouveau nombre de partitions et redémarrez :

Python

# Start the query with the default partition count (200)
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

# Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

# Restart the query with the same options
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

Scala

// Start the query with the default partition count (200)
val query = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()

// Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

// Restart the query with the same options
val query2 = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()