Oharra
Baimena behar duzu orria atzitzeko. Direktorioetan saioa has dezakezu edo haiek alda ditzakezu.
Baimena behar duzu orria atzitzeko. Direktorioak alda ditzakezu.
Clase base para los escritores de flujos de datos que procesan los datos mediante pyArrow RecordBatch.
A diferencia DataSourceStreamWriterde , que funciona con un iterador de objetos Spark Row , esta clase está optimizada para el formato Arrow al escribir datos de streaming. Puede ofrecer un mejor rendimiento al interactuar con sistemas o bibliotecas que admiten de forma nativa Arrow para casos de uso de streaming. Implemente esta clase y devuelva una instancia de DataSource.streamWriter() para que un origen de datos se pueda escribir como receptor de streaming mediante Arrow.
Sintaxis
from pyspark.sql.datasource import DataSourceStreamArrowWriter
class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
def write(self, iterator):
...
Methods
| Método | Descripción |
|---|---|
write(iterator) |
Escribe un iterador de objetos PyArrow RecordBatch en el receptor de streaming. Se llama a en ejecutores una vez por microbatch. Devuelve un , WriterCommitMessageo None si no hay ningún mensaje de confirmación. Este método es abstracto y debe implementarse. |
commit(messages, batchId) |
Confirma el microbatch mediante una lista de mensajes de confirmación recopilados de todos los ejecutores. Se invoca en el controlador cuando todas las tareas del microbatch se ejecutan correctamente. Se hereda de DataSourceStreamWriter. |
abort(messages, batchId) |
Anula el microbatch mediante una lista de mensajes de confirmación recopilados de todos los ejecutores. Se invoca en el controlador cuando se produce un error en una o varias tareas del microbatch. Se hereda de DataSourceStreamWriter. |
Notas
- El controlador recopila mensajes de confirmación de todos los ejecutores y los pasa a
commit()si todas las tareas se realizan correctamente o aabort()si se produce un error en alguna tarea. - Si se produce un error en una tarea de escritura, su mensaje de confirmación estará
Noneen la lista pasada acommit()oabort(). -
batchIdidentifica de forma única cada microbatch e incrementa en 1 con cada microbatch procesado.
Ejemplos
Implemente un sistema de escritura de secuencias basado en flechas que cuente filas por microbatch:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
def write(self, iterator):
total_rows = 0
for batch in iterator:
total_rows += len(batch)
return MyCommitMessage(num_rows=total_rows)
def commit(self, messages, batchId):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed batch {batchId} with {total} rows")
def abort(self, messages, batchId):
print(f"Batch {batchId} failed, performing cleanup")