Comparteix via


Uso de ForEachBatch para escribir en receptores de datos arbitrarios en canalizaciones

Importante

La foreach_batch_sink API está en versión preliminar pública.

El receptor ForEachBatch permite procesar una secuencia como una serie de microprocesos. Cada lote se puede procesar en Python con lógica personalizada similar a la de foreachBatchApache Spark Structured Streaming. Con el receptor ForEachBatch de Lakeflow Spark Declarative Pipelines (SDP), puede transformar, combinar o escribir datos de streaming en uno o varios destinos que no admiten escrituras de streaming de forma nativa. En esta página se explica cómo configurar un receptor ForEachBatch, se proporcionan ejemplos y se describen las consideraciones clave.

El receptor ForEachBatch proporciona la siguiente funcionalidad:

  • Lógica personalizada para cada microlote: ForEachBatch es un destino de streaming flexible. Puede aplicar acciones arbitrarias (como combinar en una tabla externa, escribir en varios destinos o realizar upserts) con código de Python.
  • Compatibilidad con la actualización completa: las canalizaciones administran los puntos de control por flujo, por lo que los puntos de control se restablecen automáticamente cuando se realiza una actualización completa de la canalización. Con el receptor ForEachBatch, usted es responsable de administrar el restablecimiento de datos de bajada cuando esto sucede.
  • Compatibilidad con catálogos de Unity: el receptor ForEachBatch admite todas las características del catálogo de Unity, como leer o escribir en volúmenes o tablas del catálogo de Unity.
  • Mantenimiento limitado: la canalización no realiza un seguimiento de los datos que se escriben desde un fregadero ForEachBatch, por lo que no puede limpiar esos datos. Usted es responsable de cualquier administración de datos posterior.
  • Entradas del registro de eventos: el registro de eventos de la canalización registra la creación y el uso de cada sumidero ForEachBatch. Si la función de Python no es serializable, verá una entrada de advertencia en el registro de eventos con sugerencias adicionales.

Nota:

  • El receptor ForEachBatch está diseñado para las consultas de streaming, como append_flow. No está pensado para canalizaciones en lote ni para las AutoCDC semánticas.
  • El receptor ForEachBatch descrito en esta página es para canalizaciones. Apache Spark Structured Streaming también admite foreachBatch. Para obtener información sobre Structured Streaming foreachBatch, consulte Uso de foreachBatch para escribir en receptores de datos arbitrarios.

Cuándo usar un receptor ForEachBatch

Use un receptor ForEachBatch siempre que la canalización requiera funcionalidad que no esté disponible a través de un formato de receptor integrado, como delta, o kafka. Entre los casos de uso típicos se incluyen:

  • Combinar o actualizar en una tabla de Delta Lake: ejecute la lógica de combinación personalizada para cada micro-lote (por ejemplo, manejar registros actualizados).
  • Escritura en múltiples destinos o destinos no admitidos: escriba la salida de cada lote en varias tablas o sistemas de almacenamiento externos que no admitan escrituras de transmisión (como ciertos receptores JDBC).
  • Aplicación de lógica o transformaciones personalizadas: manipule datos en Python directamente (por ejemplo, mediante bibliotecas especializadas o transformaciones avanzadas).

Para obtener información sobre los receptores integrados o la creación de receptores personalizados con Python, consulte Sinks in Lakeflow Spark Declarative Pipelines (Receptores en canalizaciones declarativas de Spark de Lakeflow).

Syntax

Utilice la @dp.foreach_batch_sink() decoración para generar un receptor ForEachBatch. A continuación, puede hacer referencia a esto como target en la definición de flujo, por ejemplo, en @dp.append_flow.

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.
Parámetro Description
name Optional. Un nombre único para identificar el destino dentro de la canalización. El nombre de la UDF se usa por defecto cuando no se incluye.
batch_handler Esta es la función definida por el usuario (UDF) a la que se llamará para cada microproceso.
Df DataFrame de Spark que contiene datos para el micro-lote actual.
batch_id Identificador entero del micro-lote. Spark incrementa este identificador para cada intervalo de activación.
Un batch_id de 0 representa el comienzo de una secuencia o el inicio de una actualización completa. El código foreach_batch_sink debe controlar correctamente una actualización completa para los orígenes de datos descendentes. Consulte la sección siguiente para obtener más información.

Actualización completa

Dado que ForEachBatch usa una consulta de streaming, la canalización realiza un seguimiento del directorio de punto de control de cada flujo. En actualización completa:

  • Se restablece el directorio del punto de control.
  • La función de sumidero (foreach_batch_sink UDF) ve un nuevo ciclo batch_id a partir de 0.
  • La canalización no limpia automáticamente los datos del sistema de destino (ya que la canalización no sabe dónde se escriben los datos). Si necesita un escenario de pizarra limpia, debe quitar o truncar manualmente las tablas o ubicaciones externas que rellena el receptor forEachBatch.

Uso de características del catálogo de Unity

Todas las funcionalidades existentes del catálogo de Unity en Spark Structured Streaming foreach_batch_sink siguen estando disponibles.

Esto incluye escribir en tablas administradas o externas de Unity Catalog. Puede escribir micro-lotes en tablas externas o administradas de Unity Catalog del mismo modo que lo haría en cualquier trabajo de streaming estructurado de Apache Spark.

Entradas del registro de eventos

Al crear un receptor de ForEachBatch, se agrega un evento SinkDefinition con "format": "foreachBatch" al registro de eventos de la canalización.

Esto le permite realizar un seguimiento del uso de los receptores ForEachBatch y ver advertencias sobre el receptor.

Uso de Databricks Connect

Si la función que proporciona no es serializable (un requisito importante para Databricks Connect), el registro de eventos incluye una WARN entrada que recomienda simplificar o refactorizar el código si se requiere compatibilidad con Databricks Connect.

Por ejemplo, si usa dbutils para obtener parámetros dentro de una UDF ForEachBatch, puede obtener el argumento antes de usarlo en la UDF:

# Instead of accessing parameters within the UDF...
def foreach_batch(df, batchId):
  value = dbutils.widgets.get ("X") + str (i)

# ...get the parameters first, and use them within the UDF:
argX = dbutils.widgets.get ("X")

def foreach_batch(df, batchId):
  value = argX + str (i)

procedimientos recomendados

  1. Mantenga concisa la función ForEachBatch: evite subprocesos, dependencias de biblioteca intensivas o manipulaciones de datos en memoria grandes. La lógica compleja o con estado puede provocar errores de serialización o cuellos de botella en el rendimiento.
  2. Supervise su carpeta de puntos de control: Para las consultas de streaming, SDP administra los puntos de control por flujo, no por destino. Si tiene varios flujos en la canalización, cada flujo tiene su propio directorio de punto de control.
  3. Validar dependencias externas: si se basa en sistemas o bibliotecas externos, compruebe que están instalados en todos los nodos del clúster o en el contenedor.
  4. Tenga en cuenta Databricks Connect: Si su entorno puede pasar a Databricks Connect en el futuro, compruebe que su código sea serializable y que no dependa de dbutils dentro de la foreach_batch_sink UDF.

Limitaciones

  • No hay limpieza para ForEachBatch: dado que el código de Python personalizado puede escribir datos en cualquier lugar, la canalización no puede limpiar ni realizar un seguimiento de esos datos. Debe gestionar sus propias directivas de administración o retención de datos de los destinos a los que escribe.
  • Métricas en microlote: Las canalizaciones recopilan métricas de transmisión, pero algunos escenarios pueden provocar métricas incompletas o inusuales al usar ForEachBatch. Esto se debe a la flexibilidad subyacente de ForEachBatch, lo que dificulta el seguimiento del flujo de datos y las filas del sistema.
  • Compatibilidad con la escritura en varios destinos sin varias lecturas: algunos clientes pueden usar ForEachBatch para leer desde un origen una vez y luego escribir en varios destinos. Para lograr esto, debe incluir df.persist o df.cache dentro de su función ForEachBatch. Con estas opciones, Azure Databricks intentará preparar los datos solo una sola vez. Sin estas opciones, la consulta dará lugar a varias lecturas. Esto no se incluye en los ejemplos de código siguientes.
  • Uso con Databricks Connect: si la canalización se ejecuta en Databricks Connect, foreachBatch las funciones definidas por el usuario (UDF) deben ser serializables y no pueden usar dbutils. La canalización genera advertencias si detecta una UDF no serializable, pero no produce un error en la canalización.
  • Lógica no serializable: El código que hace referencia a objetos locales, clases o recursos no serializables puede romperse en contextos de Databricks Connect. Use módulos de Python puros y confirme que las referencias (por ejemplo, dbutils) no se usan si Databricks Connect es un requisito.

Examples

Ejemplo de sintaxis básica

from pyspark import pipelines as dp

# Create a ForEachBatch sink
@dp.foreach_batch_sink(name = "my_foreachbatch_sink")
def feb_sink(df, batch_id):
  # Custom logic here. You can perform merges,
  # write to multiple destinations, etc.
  return

# Create source data for example:
@dp.table()
def example_source_data():
  return spark.range(5)

# Add sink to an append flow:
@dp.append_flow(
    target="my_foreachbatch_sink",
)
def my_flow():
  return spark.readStream.format("delta").table("example_source_data")

Uso de datos de ejemplo para una canalización sencilla

En este ejemplo se utiliza la muestra del NYC Taxi. Se supone que el administrador del área de trabajo ha habilitado el catálogo de conjuntos de datos públicos de Databricks. Para el receptor, cambie my_catalog.my_schema a un catálogo y un esquema a los que tenga acceso.

from pyspark import pipelines as dp
from pyspark.sql.functions import current_timestamp

# Create foreachBatch sink
@dp.foreach_batch_sink(name = "my_foreach_sink")
def my_foreach_sink(df, batch_id):
    # Custom logic here. You can perform merges,
    # write to multiple destinations, etc.
    # For this example, we are adding a timestamp column.
    enriched = df.withColumn("processed_timestamp", current_timestamp())
    # Write to a Delta location
    enriched.write \
      .format("delta") \
      .mode("append") \
      .saveAsTable("my_catalog.my_schema.trips_sink_delta")
    # Return is optional here, but generally not used for the sink
    return

# Create an append flow that reads sample data,
# and sends it to the ForEachBatch sink
@dp.append_flow(
    target="my_foreach_sink",
)
def taxi_source():
  df = spark.readStream.table("samples.nyctaxi.trips")
  return df

Escribir a varios destinos

En este ejemplo se escribe a varios destinos. Demuestra el uso de txnVersion y txnAppId para realizar operaciones de escritura idempotentes en las tablas de Delta Lake. Para obtener más información, consulte Escrituras de tabla idempotentes en foreachBatch.

Supongamos que estamos escribiendo a dos tablas, table_a y table_b, y supongamos que dentro de un lote, la escritura en table_a se realiza correctamente mientras se produce un error en la escritura en table_b. Cuando se vuelve a ejecutar el lote, el par (txnVersion, txnAppId) permitirá que Delta ignore la escritura duplicada en table_a y solo escriba el lote en table_b.

from pyspark import pipelines as dp

app_id = "my-app-name" # different applications that write to the same table should have unique txnAppId

# Create the ForEachBatch sink
@dp.foreach_batch_sink(name="user_events_feb")
def user_events_handler(df, batch_id):
    # Optionally do transformations, logging, or merging logic
    # ...

    # Write to a Delta table
    df.write \
     .format("delta") \
     .mode("append") \
     .option("txnVersion", batch_id) \
     .option("txnAppId", app_id) \
     .saveAsTable("my_catalog.my_schema.example_table_1")

    # Also write to a JSON file location
    df.write \
      .format("json") \
      .mode("append") \
      .option("txnVersion", batch_id) \
      .option("txnAppId", app_id) \
      .save("/tmp/json_target")
    return

# Create source data for example
@dp.table()
def example_source():
  return spark.range(5)


# Create the append flow, and target the ForEachBatch sink
@dp.append_flow(target="user_events_feb", name="user_events_flow")
def read_user_events():
    return spark.readStream.format("delta").table("example_source")

Uso de spark.sql()

Puede usar spark.sql() en el receptor ForEachBatch, como en el ejemplo siguiente.

from pyspark import pipelines as dp
from pyspark.sql import Row

@dp.foreach_batch_sink(name = "example_sink")
def feb_sink(df, batch_id):
  df.createOrReplaceTempView("df_view")
  df.sparkSession.sql("MERGE INTO target_table AS tgt " +
            "USING df_view AS src ON tgt.id = src.id " +
            "WHEN MATCHED THEN UPDATE SET tgt.id = src.id * 10 " +
            "WHEN NOT MATCHED THEN INSERT (id) VALUES (id)"
          )
  return

# Create target delta table
spark.range(5).write.format("delta").mode("overwrite").saveAsTable("target_table")

# Create source table
@dp.table()
def src_table():
  return spark.range(5)

@dp.append_flow(
    target="example_sink",
)
def example_flow():
  return spark.readStream.format("delta").table("source_table")

Preguntas más frecuentes (FAQ)

¿Puedo usar dbutils en mi receptor ForEachBatch?

Si tiene previsto ejecutar la canalización en un entorno que no sea Databricks Connect, dbutils puede funcionar. Sin embargo, si usa Databricks Connect, dbutils no es accesible dentro de su función foreachBatch. La tubería puede generar advertencias si detecta el uso de dbutils para ayudarle a evitar interrupciones.

¿Puedo usar varios flujos con un único receptor ForEachBatch?

Sí. Puede definir varios flujos (con @dp.append_flow) que tienen como destino el mismo nombre de destino, pero cada uno mantiene sus propios puntos de control.

¿La tubería gestiona la retención o limpieza de datos para mi objetivo?

No. Debido a que el sink ForEachBatch puede escribir en cualquier ubicación o sistema arbitrarios, la canalización no puede gestionar ni eliminar automáticamente datos en ese destino. Debe controlar estas operaciones como parte del código personalizado o los procesos externos.

¿Cómo puedo solucionar errores de serialización o errores en mi función ForEachBatch?

Examine los registros del controlador de clúster o los registros de eventos de canalización. En el caso de los problemas de serialización relacionados con Spark Connect, compruebe que la función depende solo de objetos de Python serializables y no hace referencia a objetos no permitidos (como identificadores de archivo abiertos o dbutils).