Repartición de estado a petición para consultas de streaming con estado

La repartición de estado a petición permite cambiar el tamaño del número de particiones de una consulta de Streaming estructurado con estado sin perder el estado del punto de comprobación.

Sin el reparticionamiento del estado bajo demanda, se establece el número de particiones de mezcla durante la creación del punto de control. Si cambia spark.sql.shuffle.partitions, las consultas con puntos de control existentes omiten el nuevo valor. La aplicación de un nuevo recuento de particiones requiere reiniciar la consulta con un nuevo punto de control.

La repartición de estados a petición tiene las siguientes ventajas:

  • Ajuste las consultas cambiando el número de particiones sin volver a generar el punto de comprobación.
  • Escale o reduzca verticalmente las consultas para que coincidan con los cambios de carga de trabajo.

Requirements

Cambiar el número de particiones

Use la configuración de Spark spark.sql.streaming.stateStore.partitions y reinicie la consulta para cambiar el número de particiones de shuffle y del estado de streaming:

Python

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
query = df.writeStream.start()

Scala

query.stop()
spark.conf.set("spark.sql.streaming.stateStore.partitions", "<numPartitions>")
val query = df.writeStream.start()

En el caso de las consultas con estado, spark.sql.streaming.stateStore.partitions tiene prioridad sobre spark.sql.shuffle.partitions. Una vez reiniciada la consulta y se completa la última microbatch planeada, la consulta ejecuta una operación de repartición para redistribuir los datos de estado en el nuevo número de particiones. Una vez completada la operación de repartición, la consulta reanuda su procesamiento.

Supervisión del estado de repartición

Una vez completado el siguiente microbatch, StreamingQueryProgress los eventos incluyen la duración de la operación de repartición. En las métricas de durationMs un evento, controlBatch.REPARTITION muestra el valor de duración en milisegundos. Los tamaños de estado más grandes pueden aumentar el tiempo de volver a particionar. Consulte Supervisión de consultas de streaming estructurado en Azure Databricks.

Example

El siguiente ejemplo reduce una consulta de 200, el valor predeterminado, a 100 particiones de mezcla. Detenga la consulta, establezca el nuevo recuento de particiones y reinicie:

Python

# Start the query with the default partition count (200)
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

# Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

# Restart the query with the same options
query = (df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window("event_time", "5 minutes"),
    "id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()
)

Scala

// Start the query with the default partition count (200)
val query = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()

// Stop the query and scale down to 100 partitions
query.stop()

spark.conf.set("spark.sql.streaming.stateStore.partitions", "100")

// Restart the query with the same options
val query2 = df
  .withWatermark("event_time", "10 minutes")
  .groupBy(
    window($"event_time", "5 minutes"),
    $"id")
  .count()
  .writeStream
  .format("delta")
  .option("checkpointLocation", "/checkpoint/path")
  .outputMode("append")
  .start()