Ripartizione dello stato su richiesta per le query di streaming con stato

Il ripartizionamento dello stato su richiesta consente di ridimensionare il numero di partizioni per una query di Structured Streaming con stato senza perdere lo stato del checkpoint.

Senza ripartizione dello stato su richiesta, si imposta il numero di partizioni di shuffle al momento della creazione del checkpoint. Se si modifica spark.sql.shuffle.partitions, le query con checkpoint esistenti ignorano il nuovo valore. L'applicazione di un nuovo numero di partizioni richiede di riavviare la query con un nuovo checkpoint.

Il ripartizionamento dello stato su richiesta offre i vantaggi seguenti:

  • Ottimizza le query ridimensionando il numero di partizioni senza ricostruire il checkpoint.
  • Aumentare o ridurre le query in base alle modifiche del carico di lavoro.

Requirements

Modificare il numero di partizioni

Usare la configurazione Spark spark.sql.streaming.stateStore.partitions e riavviare la query per modificare il numero di partizioni di shuffle e dello stato di streaming:

Python

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()

Scala

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()

Per le query con stato, spark.sql.streaming.stateStore.partitions ha la precedenza su spark.sql.shuffle.partitions. Al riavvio della query e al completamento dell'ultimo microbatch pianificato, la query esegue un'operazione di ripartizione per ridistribuire i dati sullo stato nel nuovo numero di partizioni. Al termine dell'operazione di ripartizione, la query riprende l'elaborazione.

Monitorare lo stato di ripartizione

Dopo il completamento del microbatch successivo, gli eventi StreamingQueryProgress includono la durata dell'operazione di ripartizione. Nelle metriche di un evento, controlBatch.REPARTITION indica il valore della durata in millisecondi. Dimensioni di stato maggiori possono aumentare il tempo di ripartizionamento. Consulta Monitoring Structured Streaming queries on Azure Databricks (Monitoraggio delle query di streaming strutturato in Azure Databricks).

Example

L'esempio seguente riduce una query da 200, il valore predefinito, a 100 partizioni di shuffle. Arrestare la query, impostare il nuovo numero di partizioni e riavviare:

Python

# Start the query with the default partition count (200)
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

# Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

# Restart the query with the same options
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

Scala

// Start the query with the default partition count (200)
val query = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()

// Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

// Restart the query with the same options
val query2 = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()