Compartir a través de


Consideraciones de producción para Structured Streaming

Este artículo contiene recomendaciones para programar cargas de trabajo de Structured Streaming mediante trabajos en Azure Databricks.

Databricks recomienda realizar siempre lo siguiente:

  • Quite el código innecesario de los cuadernos que pudieran devolver resultados, como display y count.
  • No ejecute cargas de trabajo de Structured Streaming mediante proceso de uso completo. Programe siempre secuencias como trabajos mediante el proceso de trabajos.
  • Programar trabajos mediante el modo Continuous.
  • No habilite el escalado automático para el proceso de trabajos de Structured Streaming.

Algunas cargas de trabajo se benefician de lo siguiente:

El marco Delta Live Tables de Azure Databricks está desarrollado para reducir las complejidades de la administración de la infraestructura de producción para las cargas de trabajo de Structured Streaming. Databricks recomienda que use Delta Live Tables para las nuevas canalizaciones de Structured Streaming. Consulte ¿Qué es Delta Live Tables?

Nota:

El escalado automático de proceso tiene limitaciones al reducir verticalmente el tamaño del clúster para cargas de trabajo de Structured Streaming. Databricks recomienda usar tablas Delta Live con escalado automático mejorado para cargas de trabajo de streaming. Consulte Optimización del uso del clúster de canalizaciones de Delta Live Tables con escalado automático mejorado.

Diseño de cargas de trabajo de streaming para esperar un error

Databricks recomienda configurar siempre trabajos de streaming para reiniciarse automáticamente en caso de error. Algunas funciones, incluida la evolución del esquema, asumen que las cargas de trabajo de Structured Streaming están configuradas para reintentar automáticamente. Consulte Configuración de trabajos de Structured Streaming para que reinicien las consultas de streaming en caso de error.

Algunas operaciones como foreachBatch proporcionan garantías de al menos una vez en lugar de exactamente una vez. Para estas operaciones, debe hacer que la canalización de procesamiento sea idempotente. Consulte Uso de foreachBatch para escribir en receptores de datos arbitrarios.

Nota:

Cuando se reinicia una consulta, el microlote planeado durante los procesos de ejecución anteriores. Si el trabajo no se pudo realizar debido a un error de memoria insuficiente o a que canceló manualmente un trabajo debido a un microlote sobredimensionado, es posible que tenga que escalar verticalmente el proceso para procesar correctamente el microlote.

Si cambia las configuraciones entre ejecuciones, estas configuraciones se aplican al primer lote nuevo planeado. Consulte Recuperación después de realizar cambios en una consulta de Structured Streaming.

¿Cuándo se reintenta un trabajo?

Puede programar varias tareas como parte de un trabajo de Azure Databricks. Al configurar un trabajo mediante el desencadenador continuo, no se pueden establecer dependencias entre tareas.

Puede optar por programar varias secuencias en un solo trabajo mediante uno de los métodos siguientes:

  • Varias tareas: defina un trabajo con varias tareas que ejecutan cargas de trabajo de streaming mediante el desencadenador continuo.
  • Varias consultas: defina varias consultas de streaming en el código fuente para una sola tarea.

También puede combinar estas estrategias. La siguiente tabla compara estos enfoques.

Varias tareas Múltiples consultas
¿Cómo se comparte el proceso? Databricks recomienda implementar trabajos de proceso de tamaño adecuado para cada tarea de streaming. Opcionalmente, puede compartir el proceso entre tareas. Todas las consultas comparten el mismo proceso. Puede asignar consultas opcionales a grupos de programador.
¿Cómo se controlan los reintentos? Todas las tareas deben producir un error antes de que el trabajo vuelva a intentarlo. La tarea vuelve a intentarlo si se produce un error en alguna consulta.

Configuración de trabajos de Structured Streaming para que reinicien las consultas de streaming en caso de error

Databricks recomienda configurar todas las cargas de trabajo de streaming mediante el desencadenador continuo. Consulte Ejecución de trabajos continuamente.

El desencadenador continuo proporciona el siguiente comportamiento de forma predeterminada:

  • Impide que se ejecute más de una ejecución simultánea del trabajo.
  • Inicia una nueva ejecución cuando se produce un error en una ejecución anterior.
  • Usa retroceso exponencial para reintentos.

Databricks recomienda usar siempre el proceso de trabajos en lugar del proceso multiuso al programar flujos de trabajo. En caso de error y reintento del trabajo, se implementan nuevos recursos de proceso.

Nota:

No es necesario usar streamingQuery.awaitTermination() ni spark.streams.awaitAnyTermination(). Los trabajos impiden automáticamente que se complete ningún trabajo cuando cualquier consulta de streaming esté activa.

Uso de grupos de programador para varias consultas de streaming

Puede configurar grupos de programación para asignar capacidad de proceso a las consultas al ejecutar varias consultas de streaming desde el mismo código fuente.

De forma predeterminada, todas las consultas iniciadas en un cuaderno se ejecutan en el mismo grupo de programación imparcial. Los trabajos de Apache Spark generados por desencadenadores de todas las consultas de streaming de un cuaderno se ejecutan uno tras otro en el orden FIFO (primero en entrar, primero en salir). Esto puede provocar retrasos innecesarios en las consultas, ya que no comparten eficazmente los recursos del clúster.

Los grupos de programadores permiten declarar qué consultas de Structured Streaming comparten recursos de proceso.

En el ejemplo siguiente se asigna query1 a un grupo dedicado, mientras que query2 y query3 comparten un grupo de programadores.

# Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").toTable("table1")

# Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").toTable("table2")

# Run streaming query3 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query3").toTable("table3")

Nota:

La configuración de la propiedad local debe estar en la misma celda del cuaderno en que se inicia la consulta de streaming.

Para más información, consulte la documentación del programador justo de Apache.