Compartir a través de


Consideraciones de producción para Structured Streaming

Este artículo contiene recomendaciones para programar cargas de trabajo de Structured Streaming mediante trabajos en Azure Databricks.

Databricks recomienda hacer siempre lo siguiente:

  • Quite el código innecesario de los cuadernos que devolverían resultados, como display y count.
  • No ejecute cargas de trabajo de Structured Streaming mediante proceso multiuso. Programe siempre los flujos como trabajos utilizando proceso de trabajos.
  • Programar trabajos mediante el modo Continuous.
  • No habilite el escalado automático para el proceso para trabajos de Structured Streaming.

Algunas cargas de trabajo se benefician de lo siguiente:

Azure Databricks ha introducido Lakeflow Spark Declarative Pipelines para reducir las complejidades de gestión de la infraestructura de producción para cargas de trabajo de Structured Streaming. Databricks recomienda usar canalizaciones declarativas de Spark de Lakeflow para las nuevas canalizaciones de Structured Streaming. Consulte Canalizaciones declarativas de Spark de Lakeflow.

Nota

El escalado automático de proceso tiene limitaciones al reducir verticalmente el tamaño del clúster para cargas de trabajo de Structured Streaming. Databricks recomienda usar canalizaciones declarativas de Spark de Lakeflow con escalado automático mejorado para cargas de trabajo de streaming. Consulte Optimización del uso del clúster de canalizaciones declarativas de Spark de Lakeflow con escalado automático.

Diseño de cargas de trabajo de streaming para esperar un error

Databricks recomienda configurar siempre los trabajos de streaming para reiniciarse automáticamente en caso de error. Algunas funciones, incluida la evolución del esquema, asumen que las cargas de trabajo de Structured Streaming están configuradas para reintentar automáticamente. Consulte Configuración de trabajos de Structured Streaming para que reinicien las consultas de streaming en caso de error.

Algunas operaciones como foreachBatch proporcionan al menos una vez en lugar de garantías exactamente una vez. Para estas operaciones, debe hacer que la canalización de procesamiento sea idempotente. Consulte Uso de foreachBatch para escribir en receptores de datos arbitrarios.

Nota

Cuando se reinicia una consulta, se procesa el microlote planificado durante la ejecución anterior. Si su trabajo falló debido a un error de falta de memoria o canceló manualmente un trabajo debido a un microlote sobredimensionado, es posible que necesite escalar verticalmente el proceso informático para procesar correctamente el microlote.

Si cambia las configuraciones entre ejecuciones, estas configuraciones se aplican al primer lote nuevo planeado. Consulte Recuperación después de los cambios en una consulta de Structured Streaming.

¿Cuándo se reintenta un trabajo?

Puede programar varias tareas como parte de un trabajo de Azure Databricks. Al configurar un trabajo mediante el desencadenador continuo, no se pueden establecer dependencias entre tareas.

Puede optar por programar varias secuencias en un solo trabajo mediante uno de los siguientes métodos:

  • Varias tareas: defina un trabajo con varias tareas que ejecutan cargas de trabajo de streaming mediante el desencadenador continuo.
  • Varias consultas: defina varias consultas de streaming en el código fuente para una sola tarea.

También puede combinar estas estrategias. En la siguiente tabla se comparan estos enfoques.

Estrategia: Varias tareas Varias consultas
¿Cómo se comparte el proceso? Databricks recomienda implementar procesos de trabajo del tamaño adecuado para cada tarea de streaming. Opcionalmente, puede compartir el proceso entre tareas. Todas las consultas comparten el mismo proceso. Puede asignar consultas opcionales a grupos de programador.
¿Cómo se controlan los reintentos? Todas las tareas deben producir un error antes de que el trabajo se reintente. La tarea vuelve a intentarlo si se produce un error en alguna consulta.

Configurar trabajos de Structured Streaming para reiniciar las consultas de streaming en caso de error

Databricks recomienda configurar todas las cargas de trabajo de streaming mediante el desencadenador continuo. Consulte Ejecución de trabajos continuamente.

El desencadenador continuo proporciona el siguiente comportamiento de forma predeterminada:

  • Impide que se ejecute más de una ejecución simultánea del trabajo.
  • Inicia una nueva ejecución cuando se produce un error en una ejecución anterior.
  • Usa retroceso exponencial para reintentos.

Databricks recomienda usar siempre el proceso de trabajos en lugar del proceso multiuso al programar flujos de trabajo. En caso de error y reintento del trabajo, se implementan nuevos recursos de proceso.

Nota

No es necesario usar streamingQuery.awaitTermination() ni spark.streams.awaitAnyTermination(). Los trabajos impiden que una ejecución se complete automáticamente cuando una consulta de streaming esté activa.

Uso de grupos de programador para varias consultas de streaming

Puede configurar grupos de programación para asignar capacidad de proceso a las consultas al ejecutar varias consultas de streaming desde el mismo código fuente.

De manera predeterminada, todas las consultas iniciadas en un cuaderno se ejecutan en el mismo grupo de programación justo. Los trabajos de Apache Spark generados por desencadenadores a partir de todas las consultas de streaming de un cuaderno se ejecutan uno tras otro en orden "primero en entrar, primero en salir" (FIFO). Esto puede provocar retrasos innecesarios en las consultas, ya que no comparten eficazmente los recursos del clúster.

Los grupos de programadores permiten declarar qué consultas de Structured Streaming comparten recursos de proceso.

En el ejemplo siguiente se asigna query1 a un grupo dedicado, mientras que query2 y query3 comparten un grupo de programadores.

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

Nota

La configuración de la propiedad local debe estar en la misma celda del cuaderno en que se inicia la consulta de streaming.

Consulte documentación del programador justo de Apache para obtener más información.