Share via


Uso de foreachBatch para escribir en receptores de datos arbitrarios

En este artículo se describe el uso de foreachBatch con flujo estructurado para escribir la salida de una consulta de streaming en orígenes de datos que no tengan un receptor de streaming existente.

El patrón de código streamingDF.writeStream.foreachBatch(...) permite aplicar funciones por lotes a los datos de salida de cada microlote de la consulta de streaming. Las funciones usadas con foreachBatch toman dos parámetros:

  • DataFrame que tiene los datos de salida de un microlote.
  • Identificador único del microlote.

Debe usar foreachBatch para las operaciones Merge de Delta Lake en flujo estructurado. Consulte Actualización de consultas de streaming mediante foreachBatch.

Aplicar operaciones DataFrame adicionales

Muchas operaciones DataFrame y Dataset no se admiten en DataFrames de streaming porque Spark no admite que se generen de planes incrementales en esos casos. Utilizando foreachBatch() puede aplicar algunas de estas operaciones en cada salida de microlote. Por ejemplo, es posible usar foreachBath() y la operación SQL MERGE INTO para escribir la salida de agregaciones de streaming en una tabla Delta en modo de actualización. Ver más detalles en MERGE INTO.

Importante

  • foreachBatch() solo proporciona garantías de escritura al menos una vez. Sin embargo, puede usar el batchId proporcionado en la función como forma de desduplicar la salida y obtener una garantía de una sola vez. En cualquier caso, tendrá que razonar sobre la semántica de un extremo a otro.
  • foreachBatch() no funciona con el modo de procesamiento continuo, ya que se basa fundamentalmente en la ejecución de microlotes de una consulta de streaming. Si escribe datos en modo continuo, utilice foreach() en su lugar.

Un dataframe vacío se puede invocar con foreachBatch() y el código de usuario debe ser resistente para permitir el funcionamiento adecuado. A continuación, se muestra un ejemplo:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Cambios de comportamiento para foreachBatch en Databricks Runtime 14.0

En Databricks Runtime 14.0 y versiones posteriores en el proceso configurado con el modo de acceso compartido, forEachBatch se ejecuta en un proceso de Python aislado independiente en Apache Spark, en lugar de en el entorno de REPL. Se serializa e inserta en Spark y no tiene acceso a objetos globales spark durante la sesión.

En todas las demás configuraciones de proceso, foreachBatch se ejecuta en el mismo REPL de Python que ejecuta el resto del código. Como resultado, la función no se serializa.

Cuando se usa Databricks Runtime 14.0 y versiones posteriores en el proceso configurado con el modo de acceso compartido, debe usar la variable sparkSession en el ámbito del DataFrame local cuando se usa foreachBatch en Python, como en el siguiente ejemplo de código:

def example_function(df, batch_id):
  df.sparkSession.sql("<query>")

Se aplican los siguientes cambios de comportamiento:

  • No puede acceder a ninguna variable global de Python desde dentro de la función.
  • print() los comandos escriben la salida en los registros del controlador.
  • Cualquier archivo, módulo u objeto referenciado en la función debe ser serializable y estar disponible en Spark.

Reutilización de orígenes de datos por lotes existentes

Con foreachBatch(), es posible usar escrituras de datos por lotes existentes para receptores de datos que podrían no tener compatibilidad con el flujo estructurado. Estos son algunos ejemplos:

Se pueden usar muchos otros orígenes de datos por lotes desde foreachBatch(). Consulte Conexión a orígenes de datos.

Escribir a varias ubicaciones

Si necesita escribir la salida de una consulta de streaming en varias ubicaciones, Databricks recomienda usar varios escritores de flujo estructurado para obtener la mejor paralelización y rendimiento.

El uso de foreachBatch para escribir en varios receptores serializa la ejecución de escrituras de streaming, lo que podría aumentar la latencia de cada microproceso.

Si usa foreachBatch para escribir en varias tablas Delta, consulte Escrituras de tablas Idempotent en foreachBatch.