Partekatu honen bidez:


DataSourceArrowWriter

Clase base para los escritores de orígenes de datos que procesan datos mediante pyArrow.RecordBatch

A diferencia DataSourceWriterde , que funciona con un iterador de objetos Spark Row , esta clase está optimizada para el formato Arrow al escribir datos. Puede ofrecer un mejor rendimiento al interactuar con sistemas o bibliotecas que admiten de forma nativa Arrow. Implemente esta clase y devuelva una instancia de DataSource.writer() para que un origen de datos se pueda escribir mediante Arrow.

Sintaxis

from pyspark.sql.datasource import DataSourceArrowWriter

class MyDataSourceArrowWriter(DataSourceArrowWriter):
    def write(self, iterator):
        ...

Methods

Método Descripción
write(iterator) Escribe un iterador de objetos PyArrow RecordBatch en el receptor. Se llama una vez en cada ejecutor. Devuelve un , WriterCommitMessageo None si no hay ningún mensaje de confirmación. Este método es abstracto y debe implementarse.
commit(messages) Confirma el trabajo de escritura mediante una lista de mensajes de confirmación recopilados de todos los ejecutores. Se invoca en el controlador cuando todas las tareas se ejecutan correctamente. Se hereda de DataSourceWriter.
abort(messages) Anula el trabajo de escritura mediante una lista de mensajes de confirmación recopilados de todos los ejecutores. Se invoca en el controlador cuando se produce un error en una o varias tareas. Se hereda de DataSourceWriter.

Notas

  • El controlador recopila mensajes de confirmación de todos los ejecutores y los pasa a commit() si todas las tareas se realizan correctamente o a abort() si se produce un error en alguna tarea.
  • Si se produce un error en una tarea de escritura, su mensaje de confirmación estará None en la lista pasada a commit() o abort().

Ejemplos

Implemente un escritor basado en flechas que cuente filas en todos los lotes:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceArrowWriter(DataSourceArrowWriter):
    def write(self, iterator):
        total_rows = 0
        for batch in iterator:
            total_rows += len(batch)
        return MyCommitMessage(num_rows=total_rows)

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")