Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Le repartitionnement d’état à la demande vous permet de redimensionner le nombre de partitions pour une requête Structured Streaming avec état sans perdre l’état du point de contrôle.
En l’absence de repartitionnement d’état à la demande, vous définissez le nombre de partitions de shuffle lors de la création du checkpoint. Si vous changez spark.sql.shuffle.partitions, les requêtes avec des points de contrôle existants ignorent la nouvelle valeur. L’application d’un nouveau nombre de partitions vous oblige à redémarrer la requête avec un nouveau point de contrôle.
Le repartitionnement d’état à la demande présente les avantages suivants :
- Ajustez les requêtes en redimensionnant le nombre de partitions sans regénérer le point de contrôle.
- Ajustez les requêtes à la hausse ou à la baisse en fonction de l’évolution de la charge de travail.
Requirements
- Databricks Runtime 18.3 ou version ultérieure.
- La requête doit utiliser le fournisseur de magasin d’état RocksDB. Sur DBR 17.3 ou version ultérieure, RocksDB est le fournisseur de magasin d’état par défaut. Consultez Configurer un stockage d’état RocksDB sur Azure Databricks.
Modifier le nombre de partitions
Utilisez la configuration Spark spark.sql.streaming.stateStore.partitions et redémarrez la requête pour modifier le nombre de partitions de shuffle et d’état de streaming :
Python
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()
Scala
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()
Pour les requêtes avec état, spark.sql.streaming.stateStore.partitions est prioritaire sur spark.sql.shuffle.partitions. Une fois la requête redémarrée et la dernière microbatch planifiée terminée, la requête exécute une opération de repartition pour redistribuer les données d’état dans le nouveau nombre de partitions. Une fois l’opération de repartition terminée, la requête reprend le traitement.
État de repartition du moniteur
Une fois le microbatch suivant terminé, StreamingQueryProgress les événements incluent la durée de l’opération de repartition. Dans les métriques d’un durationMs événement, controlBatch.REPARTITION affiche la valeur de durée en millisecondes. Des tailles d’état plus importantes peuvent augmenter le temps nécessaire au repartitionnement. Consultez la surveillance des requêtes de streaming structuré sur Azure Databricks.
Example
L’exemple suivant réduit le nombre de partitions de shuffle d’une requête de 200, la valeur par défaut, à 100. Arrêtez la requête, définissez le nouveau nombre de partitions et redémarrez :
Python
# Start the query with the default partition count (200)
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
# Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
# Restart the query with the same options
query = (df
.withWatermark("event_time", "10 minutes")
.groupBy(
window("event_time", "5 minutes"),
"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
)
Scala
// Start the query with the default partition count (200)
val query = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()
// Stop the query and scale down to 100 partitions
query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")
// Restart the query with the same options
val query2 = df
.withWatermark("event_time", "10 minutes")
.groupBy(
window($"event_time", "5 minutes"),
$"id")
.count()
.writeStream
.format("delta")
.option("checkpointLocation", "/checkpoint/path")
.outputMode("append")
.start()