Compartir a través de


Recuperación de errores de consulta de flujo estructurado con flujos de trabajo

El motor de procesamiento Structured Streaming proporciona tolerancia a errores y coherencia de datos para las consultas de streaming. El uso de los flujos de trabajo de Azure Databricks le permitirá configurar fácilmente las consultas de Structured Streaming de forma que se reinicien automáticamente en caso de error. Si habilitar el uso de puntos de control para una consulta de streaming, podrá reiniciar la consulta después de un error. Las consultas que se reinicien continuarán donde se lo hayan dejado las que fallaron.

Habilitación de puntos de control mejorados para consultas de Structured Streaming

El equipo de Databricks recomienda que, antes de iniciar la consulta, siempre se especifique el valor de la opción checkpointLocation como ruta de acceso de almacenamiento en la nube. Por ejemplo:

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "/path/to/table")
  .option("checkpointLocation", "/path/to/table/_checkpoint")
  .start()

Esta ubicación de punto de control conserva toda la información esencial que identifica a las consultas. Cada consulta debe tener una ubicación de punto de control diferente. Nunca use la misma ubicación para varias consultas. Para más información, consulte la Guía de programación de Structured Streaming.

Nota:

Aunque la opción checkpointLocation es necesaria para la mayoría de tipos de receptores de salida, hay algunos, como el receptor de memoria, que pueden generar una ubicación de punto de comprobación temporal en DBFS de forma automática si no se especifica un valor para checkpointLocation. El uso de ubicaciones de los puntos de comprobación temporales no garantiza la tolerancia a errores ni la coherencia de los datos y es posible que estas no se limpien correctamente. Especifique un valor para la opción checkpointLocation para evitar posibles problemas.

Configuración de trabajos de Structured Streaming para que reinicien las consultas de streaming en caso de error

Se pueden crear trabajos de Azure Databricks con el cuaderno o el archivo JAR que tiene las consultas de streaming y configurarlo para que:

  • Use siempre un clúster seguro.
  • Vuelva a intentarlo siempre si se produce un error.

El reinicio automático en caso de error de trabajo es especialmente importante al configurar cargas de trabajo de streaming con la evolución del esquema. La evolución del esquema funciona en Azure Databricks mediante la generación de un error esperado cuando se detecta un cambio de esquema y a después procesa correctamente los datos mediante el nuevo esquema cuando se reinicia el trabajo. Databricks recomienda configurar siempre las tareas de streaming que contienen consultas con evolución del esquema para reiniciarse automáticamente en flujos de trabajo de Databricks.

Los trabajos poseen una estrecha integración con las API de Structured Streaming y pueden supervisar todas las consultas de streaming que estén activas en una ejecución. Esta configuración garantiza que si se produce un error en cualquier parte de la consulta, los trabajos finalicen automáticamente la ejecución (junto con todas las demás consultas) e inicien una nueva ejecución en un nuevo clúster. Esto causa que se vuelva a ejecutar el cuaderno o el código JAR y que se vuelvan a iniciar todas las consultas. Esta es la manera más segura de volver a un buen estado.

Nota:

  • Si se produce un error en cualquiera de las consultas de streaming activas, provoca un error en la ejecución activa y la terminación de las restantes consultas de streaming.
  • No es necesario usar streamingQuery.awaitTermination() ni spark.streams.awaitAnyTermination() al final del cuaderno. Los trabajos impiden automáticamente que se complete ningún trabajo cuando cualquier consulta de streaming esté activa.
  • Databricks recomienda usar trabajos en lugar de %run y dbutils.notebook.run() al orquestar cuadernos de Structured Streaming. Consulte Ejecutar un cuaderno de Databricks desde otro cuaderno.

A continuación, se muestra un ejemplo de una configuración de trabajo recomendada.

  • Clúster: establezca esta opción siempre para usar un clúster nuevo y usar la versión más reciente de Spark (o al menos la versión 2.1). Las consultas iniciadas tanto en Spark 2.1 como en las versiones superiores se pueden recuperan después de las actualizaciones de la consulta y de la versión de Spark.
  • Notificaciones: esta opción se establece si desea recibir notificaciones por correo electrónico si hay errores.
  • Programación: no establezca una programación.
  • Tiempo de espera: no establezca un tiempo de espera. Las consultas de streaming se ejecutan durante un tiempo indefinido.
  • Número máximo de ejecuciones concurrentes: establézcalo en 1. No puede haber más de una instancia de cada consulta activa.
  • Reintentos: se establece en Ilimitado.

Consulte Creación y ejecución de trabajos de Azure Databricks para comprender estas configuraciones.

Recuperación después de realizar cambios en una consulta de Structured Streaming

Hay limitaciones en cuanto a los cambios de una consulta de streaming que se permiten entre reinicios desde la misma ubicación del punto de control. Estos son algunos cambios que no están permitidos o el efecto del cambio no está bien definido. Para todos ellos:

  • El término permitido significa que puede realizar el cambio especificado, pero si la semántica de su efecto está bien definida depende de la consulta y del cambio.
  • El término no permitido significa que no debe realizar el cambio especificado, ya que es probable que la consulta reiniciada tenga errores impredecibles.
  • sdf representa un dataframe/conjunto de datos de streaming generado con sparkSession.readStream.

Tipos de cambios en las consultas de Structured Streaming

  • Cambios en el número o tipo (es decir, origen diferente) de los orígenes de entrada: no se permite.
  • Cambios en los parámetros de los orígenes de entrada: si se permite y si la semántica del cambio está bien definida depende del origen y de la consulta. Estos son algunos ejemplos.
    • Se permiten la adición, eliminación y modificación de los límites de velocidad:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      to

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • Por lo general, no se permiten cambios en los artículos y archivos suscritos, ya que los resultados son impredecibles: spark.readStream.format("kafka").option("subscribe", "article")a spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Cambios en el intervalo del desencadenador: puede cambiar los desencadenadores entre lotes incrementales e intervalos de tiempo. Consulte Cambio de intervalos del desencadenador entre ejecuciones.
  • Cambios en el tipo de receptor de salida: se permiten cambios entre algunas combinaciones específicas de receptores. Esto debe comprobarse caso a caso. Estos son algunos ejemplos.
    • Se permite el paso del receptor de archivos al receptor de Kafka. Kafka solo verá los datos nuevos.
    • No se permite el paso del receptor de Kafka al receptor de archivos.
    • Se ha cambiado el receptor de Kafka a foreach, o viceversa.
  • Cambios en los parámetros del receptor de salida: si se permite y si la semántica del cambio está bien definida dependen del receptor y de la consulta. Estos son algunos ejemplos.
    • No se permiten cambios en el directorio de salida de un receptor de archivos: sdf.writeStream.format("parquet").option("path", "/somePath") a sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • Se permiten cambios en el tema de salida: sdf.writeStream.format("kafka").option("topic", "topic1") a sdf.writeStream.format("kafka").option("topic", "topic2")
    • Se permiten cambios en el receptor foreach definido por el usuario (es decir, el códigoForeachWriter), pero la semántica del cambio depende del código.
  • Cambios en las operaciones similares a proyección, filtro o mapa: se permiten algunos casos. Por ejemplo:
    • Se permite la adición o eliminación de filtros: sdf.selectExpr("a") a sdf.where(...).selectExpr("a").filter(...).
    • Se permiten cambios en las proyecciones con el mismo esquema de salida: sdf.selectExpr("stringColumn AS json").writeStream a sdf.select(to_json(...).as("json")).writeStream.
    • Los cambios en las proyecciones con un esquema de salida diferente se permiten condicionalmente: sdf.selectExpr("a").writeStream a sdf.selectExpr("b").writeStream solo se permite si el receptor de salida permite el cambio de esquema de "a" a "b".
  • Cambios en operaciones con estado: algunas operaciones de las consultas de streaming deben mantener los datos de estado en orden para actualizar continuamente el resultado. Structured Streaming crea automáticamente puntos de comprobación de los datos de estado en el almacenamiento tolerante a errores (por ejemplo, DBFS o Azure Blob Storage) y los restaura después del reinicio. Sin embargo, esto supone que el esquema de los datos de estado sigue siendo el mismo en los distintos reinicios, lo que significa que no se permiten cambios (es decir, adiciones, eliminaciones o modificaciones de esquema) en las operaciones con estado de una consulta de streaming entre reinicios. Esta es la lista de operaciones con estado cuyo esquema no se debe cambiar de un reinicio a otro para garantizar la recuperación del estado:
    • Agregación de streaming: por ejemplo, sdf.groupBy("a").agg(...). No se permite ningún cambio en el número o tipo de claves o agregados de agrupación.
    • Desduplicación de streaming: por ejemplo, sdf.dropDuplicates("a"). No se permite ningún cambio en el número o tipo de claves o agregados de agrupación.
    • Combinación flujo-flujo: por ejemplo, sdf1.join(sdf2, ...) (es decir, ambas entradas se generan con sparkSession.readStream). No se permiten cambios en el esquema ni en las columnas de combinación de igualdad. No se permiten cambios en el tipo de combinación (externo o interno). Otros cambios en la condición de combinación están mal definidos.
    • Operación arbitraria con estado: por ejemplo, sdf.groupByKey(...).mapGroupsWithState(...) o sdf.groupByKey(...).flatMapGroupsWithState(...). No se permite ningún cambio en el esquema del estado definido por el usuario y el tipo de tiempo de espera. Se permite cualquier cambio dentro de la función de asignación de estado definida por el usuario, pero el efecto semántico del cambio depende de la lógica definida por el usuario. Si realmente desea admitir cambios de esquema de estado, puede codificar o descodificar explícitamente las estructuras de datos de estado complejo en bytes mediante un esquema de codificación/descodificación que admita la migración de esquemas. Por ejemplo, si guarda el estado como bytes codificados en Avro, puede cambiar el esquema de estado de Avro durante el tiempo entre los reinicios de consulta, ya que esto restaurar el estado binario.