Nota
L'accés a aquesta pàgina requereix autorització. Podeu provar d'iniciar la sessió o de canviar els directoris.
L'accés a aquesta pàgina requereix autorització. Podeu provar de canviar els directoris.
Clase base para escritores de flujos de datos.
Los escritores de flujos de datos son responsables de escribir datos en un receptor de streaming. Implemente esta clase y devuelva una instancia de DataSource.streamWriter() para que un origen de datos se pueda escribir como receptor de streaming.
write() se llama a en ejecutores para cada microbatch y commit() se llama a en abort() el controlador después de que se completen todas las tareas del microbatch.
Sintaxis
from pyspark.sql.datasource import DataSourceStreamWriter
class MyDataSourceStreamWriter(DataSourceStreamWriter):
def write(self, iterator):
...
Methods
| Método | Descripción |
|---|---|
write(iterator) |
Escribe datos en el receptor de streaming. Se llama a en ejecutores una vez por microbatch. Acepta un iterador de Row objetos y devuelve un WriterCommitMessage, o None si no hay ningún mensaje de confirmación. Este método es abstracto y debe implementarse. |
commit(messages, batchId) |
Confirma el microbatch mediante una lista de mensajes de confirmación recopilados de todos los ejecutores. Se invoca en el controlador cuando todas las tareas del microbatch se ejecutan correctamente. |
abort(messages, batchId) |
Anula el microbatch mediante una lista de mensajes de confirmación recopilados de todos los ejecutores. Se invoca en el controlador cuando se produce un error en una o varias tareas del microbatch. |
Notas
- El controlador recopila mensajes de confirmación de todos los ejecutores y los pasa a
commit()si todas las tareas se realizan correctamente o aabort()si se produce un error en alguna tarea. - Si se produce un error en una tarea de escritura, su mensaje de confirmación estará
Noneen la lista pasada acommit()oabort(). -
batchIdidentifica de forma única cada microbatch e incrementa en 1 con cada microbatch procesado.
Ejemplos
Implemente un sistema de escritura de secuencias que anexe filas a un archivo:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.path = options.get("path")
def write(self, iterator):
rows = list(iterator)
with open(self.path, "a") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))
def commit(self, messages, batchId):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed batch {batchId} with {total} rows")
def abort(self, messages, batchId):
print(f"Batch {batchId} failed, performing cleanup")