Reparticionamento de estado sob demanda para consultas de streaming com estado

O reparticionamento de estado sob demanda permite redimensionar o número de partições de uma consulta com estado do Structured Streaming sem perder o estado do checkpoint.

Sem o reparticionamento de estado sob demanda, você define o número de partições embaralhadas durante a criação do ponto de verificação. Se você alterar spark.sql.shuffle.partitions, as consultas com pontos de verificação existentes ignorarão o novo valor. A aplicação de uma nova contagem de partições exige que você reinicie a consulta com um novo ponto de verificação.

O reparticionamento de estado sob demanda tem os seguintes benefícios:

  • Ajuste as consultas redimensionando o número de partições sem recriar o checkpoint.
  • Dimensione as consultas para cima ou para baixo para corresponder às alterações de carga de trabalho.

Requisitos

Alterar o número de partições

Use a configuração do Spark spark.sql.streaming.stateStore.partitions e reinicie a consulta para alterar o número de partições de embaralhamento e de estado de streaming:

Python

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()

Scala

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()

Para consultas com estado, spark.sql.streaming.stateStore.partitions tem precedência sobre spark.sql.shuffle.partitions. Depois que a consulta for reiniciada e a última microbatch planejada for concluída, a consulta executará uma operação de repartição para redistribuir dados de estado no novo número de partições. Após a conclusão da operação de repartição, a consulta retomará o processamento.

Monitorar o estado de repartição

Após a conclusão do próximo microlote, os eventos StreamingQueryProgress incluem a duração da operação de reparticionamento. Nas métricas de durationMs um evento, controlBatch.REPARTITION mostra o valor da duração em milissegundos. Tamanhos de estado maiores podem aumentar o tempo de repartição. Consulte monitoramento de consultas de streaming estruturado no Azure Databricks.

Example

O exemplo a seguir reduz uma consulta de 200, o padrão, para 100 partições aleatórias. Interrompa a consulta, defina a nova contagem de partições e reinicie:

Python

# Start the query with the default partition count (200)
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

# Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

# Restart the query with the same options
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

Scala

// Start the query with the default partition count (200)
val query = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()

// Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

// Restart the query with the same options
val query2 = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()