Compartir a través de


Uso de foreachBatch para escribir en receptores de datos arbitrarios

En este artículo se describe el uso de foreachBatch con flujo estructurado para escribir la salida de una consulta de streaming en orígenes de datos que no tengan un receptor de streaming existente.

El patrón de código streamingDF.writeStream.foreachBatch(...) permite aplicar funciones por lotes a los datos de salida de cada microlote de la consulta de streaming. Las funciones usadas con foreachBatch toman dos parámetros:

  • DataFrame que tiene los datos de salida de un microlote.
  • Identificador único del microlote.

Debe usar foreachBatch para las operaciones Merge de Delta Lake en flujo estructurado. Consulte Actualización de consultas de streaming mediante foreachBatch.

Aplicar operaciones DataFrame adicionales

Muchas operaciones DataFrame y Dataset no se admiten en DataFrames de streaming porque Spark no admite que se generen de planes incrementales en esos casos. Utilizando foreachBatch() puede aplicar algunas de estas operaciones en cada salida de microlote. Por ejemplo, es posible usar foreachBatch() y la operación SQL MERGE INTO para escribir la salida de agregaciones de streaming en una tabla Delta en modo de actualización. Consulta más detalles en MERGE INTO.

Importante

  • foreachBatch() solo proporciona garantías de escritura al menos una vez. Sin embargo, puede usar el batchId proporcionado en la función como forma de desduplicar la salida y obtener una garantía de una sola vez. En cualquier caso, tendrá que razonar sobre la semántica de un extremo a otro.
  • foreachBatch() no funciona con el modo de procesamiento continuo, ya que se basa fundamentalmente en la ejecución de microlotes de una consulta de streaming. Si escribe datos en modo continuo, utilice foreach() en su lugar.
  • Cuando se utiliza foreachBatch con un operador de estado, es importante consumir completamente cada lote antes de que finalice el procesamiento. Véase Consumir completamente cada lote DataFrame

Un dataframe vacío se puede invocar con foreachBatch() y el código de usuario debe ser resistente para permitir el funcionamiento adecuado. A continuación, se muestra un ejemplo:

  .foreachBatch(
          (outputDf: DataFrame, bid: Long) => {
             // Process valid data frames only
             if (!outputDf.isEmpty) {
                // business logic
             }
         }
  ).start()

Cambios de comportamiento para foreachBatch en Databricks Runtime 14.0

En Databricks Runtime 14.0 y versiones posteriores en el proceso configurado con el modo de acceso estándar, se aplican los siguientes cambios de comportamiento:

  • Los comandos print() escriben la salida en los registros del controlador.
  • No se puede acceder al submódulo dbutils.widgets dentro de la función.
  • Cualquier archivo, módulo u objeto referenciado en la función debe ser serializable y estar disponible en Spark.

Reutilización de orígenes de datos por lotes existentes

Con foreachBatch(), es posible usar escrituras de datos por lotes existentes para receptores de datos que podrían no tener compatibilidad con el flujo estructurado. Estos son algunos ejemplos:

Se pueden usar muchos otros orígenes de datos por lotes desde foreachBatch(). Consulte Conexión a orígenes de datos y servicios externos.

Escribir a varias ubicaciones

Si necesita escribir la salida de una consulta de streaming en varias ubicaciones, Databricks recomienda usar varios escritores de flujo estructurado para obtener la mejor paralelización y rendimiento.

El uso de foreachBatch para escribir en varios receptores serializa la ejecución de escrituras de streaming, lo que podría aumentar la latencia de cada microproceso.

Si usa foreachBatch para escribir en múltiples tablas Delta, mire Escrituras de tablas idempotentes en foreachBatch.

Consumir completamente cada DataFrame del lote

Cuando se usan operadores con estado (por ejemplo, mediante dropDuplicatesWithinWatermark), cada iteración por lotes debe consumir toda la trama de datos o reiniciar la consulta. Si no consumes el DataFrame completo, la consulta en streaming fallará con el siguiente lote.

Esto puede ocurrir en varios casos. En los ejemplos siguientes se muestra cómo corregir las consultas que no consumen correctamente un dataframe.

Uso intencional de un subconjunto del lote

Si solo le interesa un subconjunto del lote, podría tener código como el siguiente.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def partial_func(batch_df, batch_id):
  batch_df.show(2)

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

En este caso, el batch_df.show(2) solo maneja los dos primeros elementos del lote, lo cual es esperado, pero en caso de haber más elementos, estos deben ser procesados. El código siguiente consume el dataframe completo.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

# function to do nothing with a row
def do_nothing(row)
  pass

def partial_func(batch_df, batch_id):
  batch_df.show(2)
  batch_df.foreach(do_nothing) # silently consume the rest of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

En este caso, la do_nothing función omite silenciosamente el resto del dataframe.

Manejo de un error en un lote

Podría haber un error al ejecutar un foreachBatch proceso. Puede tener código como el siguiente (en este caso, el ejemplo genera intencionadamente un error para mostrar el problema).

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have


q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Al manejar (y tragar silenciosamente) el error, el resto del lote puede no ser consumido. Hay dos opciones para controlar esta situación.

En primer lugar, podría volver a lanzar el error, lo que lo pasa a su capa de orquestación para que vuelva a intentar el lote. Esto podría resolver el error, si es un problema transitorio o generarlo para que el equipo de operaciones intente corregir manualmente. Para ello, cambie el partial_func código para que tenga este aspecto:

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    raise e # re-raise the issue

La segunda opción, si desea detectar la excepción y omitir el resto del lote, es cambiar el código a este.

from pyspark.sql.functions import expr

stream = spark.readStream.format("rate").option("rowsPerSecond", "100").load()
# creates a stateful operator:
streamWithWatermark = stream.withWatermark("timestamp", "15 minutes").dropDuplicatesWithinWatermark()

def foreach_func(row):
  # handle the row, but in this case, for the sample, will just raise an error:
  raise Exception('error')

# function to do nothing with a row
def do_nothing(row)
    pass

def partial_func(batch_df, batch_id):
  try:
    batch_df.foreach(foreach_func)
  except Exception as e:
    print(e) # or whatever error handling you want to have
    batch_df.foreach(do_nothing) # silently consume the remainder of the batch

q = streamWithWatermark.writeStream \
  .foreachBatch(partial_func) \
  .option("checkpointLocation", checkpoint_dir) \
  .trigger(processingTime='2 seconds') \
  .start()

Este código usa la do_nothing función para omitir silenciosamente el resto del lote.