Oharra
Baimena behar duzu orria atzitzeko. Direktorioetan saioa has dezakezu edo haiek alda ditzakezu.
Baimena behar duzu orria atzitzeko. Direktorioak alda ditzakezu.
Clase base para los escritores de orígenes de datos que procesan datos mediante pyArrow.RecordBatch
A diferencia DataSourceWriterde , que funciona con un iterador de objetos Spark Row , esta clase está optimizada para el formato Arrow al escribir datos. Puede ofrecer un mejor rendimiento al interactuar con sistemas o bibliotecas que admiten de forma nativa Arrow. Implemente esta clase y devuelva una instancia de DataSource.writer() para que un origen de datos se pueda escribir mediante Arrow.
Sintaxis
from pyspark.sql.datasource import DataSourceArrowWriter
class MyDataSourceArrowWriter(DataSourceArrowWriter):
def write(self, iterator):
...
Methods
| Método | Descripción |
|---|---|
write(iterator) |
Escribe un iterador de objetos PyArrow RecordBatch en el receptor. Se llama una vez en cada ejecutor. Devuelve un , WriterCommitMessageo None si no hay ningún mensaje de confirmación. Este método es abstracto y debe implementarse. |
commit(messages) |
Confirma el trabajo de escritura mediante una lista de mensajes de confirmación recopilados de todos los ejecutores. Se invoca en el controlador cuando todas las tareas se ejecutan correctamente. Se hereda de DataSourceWriter. |
abort(messages) |
Anula el trabajo de escritura mediante una lista de mensajes de confirmación recopilados de todos los ejecutores. Se invoca en el controlador cuando se produce un error en una o varias tareas. Se hereda de DataSourceWriter. |
Notas
- El controlador recopila mensajes de confirmación de todos los ejecutores y los pasa a
commit()si todas las tareas se realizan correctamente o aabort()si se produce un error en alguna tarea. - Si se produce un error en una tarea de escritura, su mensaje de confirmación estará
Noneen la lista pasada acommit()oabort().
Ejemplos
Implemente un escritor basado en flechas que cuente filas en todos los lotes:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceArrowWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceArrowWriter(DataSourceArrowWriter):
def write(self, iterator):
total_rows = 0
for batch in iterator:
total_rows += len(batch)
return MyCommitMessage(num_rows=total_rows)
def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")
def abort(self, messages):
print("Write job failed, performing cleanup")