Compartir a través de


DataSourceStreamArrowWriter

Clase base para los escritores de flujos de datos que procesan los datos mediante pyArrow RecordBatch.

A diferencia DataSourceStreamWriterde , que funciona con un iterador de objetos Spark Row , esta clase está optimizada para el formato Arrow al escribir datos de streaming. Puede ofrecer un mejor rendimiento al interactuar con sistemas o bibliotecas que admiten de forma nativa Arrow para casos de uso de streaming. Implemente esta clase y devuelva una instancia de DataSource.streamWriter() para que un origen de datos se pueda escribir como receptor de streaming mediante Arrow.

Sintaxis

from pyspark.sql.datasource import DataSourceStreamArrowWriter

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        ...

Methods

Método Descripción
write(iterator) Escribe un iterador de objetos PyArrow RecordBatch en el receptor de streaming. Se llama a en ejecutores una vez por microbatch. Devuelve un , WriterCommitMessageo None si no hay ningún mensaje de confirmación. Este método es abstracto y debe implementarse.
commit(messages, batchId) Confirma el microbatch mediante una lista de mensajes de confirmación recopilados de todos los ejecutores. Se invoca en el controlador cuando todas las tareas del microbatch se ejecutan correctamente. Se hereda de DataSourceStreamWriter.
abort(messages, batchId) Anula el microbatch mediante una lista de mensajes de confirmación recopilados de todos los ejecutores. Se invoca en el controlador cuando se produce un error en una o varias tareas del microbatch. Se hereda de DataSourceStreamWriter.

Notas

  • El controlador recopila mensajes de confirmación de todos los ejecutores y los pasa a commit() si todas las tareas se realizan correctamente o a abort() si se produce un error en alguna tarea.
  • Si se produce un error en una tarea de escritura, su mensaje de confirmación estará None en la lista pasada a commit() o abort().
  • batchId identifica de forma única cada microbatch e incrementa en 1 con cada microbatch procesado.

Ejemplos

Implemente un sistema de escritura de secuencias basado en flechas que cuente filas por microbatch:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        total_rows = 0
        for batch in iterator:
            total_rows += len(batch)
        return MyCommitMessage(num_rows=total_rows)

    def commit(self, messages, batchId):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed batch {batchId} with {total} rows")

    def abort(self, messages, batchId):
        print(f"Batch {batchId} failed, performing cleanup")