Condividi tramite


DataSourceStreamArrowWriter

Classe di base per i writer di flussi di dati che elaborano i dati usando pyarrow.RecordBatch

A differenza di DataSourceStreamWriter, che funziona con un iteratore di oggetti Spark Row , questa classe è ottimizzata per il formato Arrow durante la scrittura di dati di streaming. Può offrire prestazioni migliori quando si interagisce con sistemi o librerie che supportano in modo nativo Arrow per i casi d'uso di streaming. Implementare questa classe e restituire un'istanza da DataSource.streamWriter() per rendere scrivibile un'origine dati come sink di streaming usando Arrow.

Sintassi

from pyspark.sql.datasource import DataSourceStreamArrowWriter

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        ...

Methods

metodo Descrizione
write(iterator) Scrive un iteratore di oggetti PyArrow RecordBatch nel sink di streaming. Chiamato su executor una volta per microbatch. Restituisce un WriterCommitMessageoggetto o None se non è presente alcun messaggio di commit. Questo metodo è astratto e deve essere implementato.
commit(messages, batchId) Esegue il commit del microbatch usando un elenco di messaggi di commit raccolti da tutti gli executor. Richiamato sul driver quando tutte le attività nel microbatch vengono eseguite correttamente. Ereditato da DataSourceStreamWriter.
abort(messages, batchId) Interrompe il microbatch usando un elenco di messaggi di commit raccolti da tutti gli executor. Richiamato sul driver quando una o più attività nel microbatch non sono riuscite. Ereditato da DataSourceStreamWriter.

Note

  • Il driver raccoglie i messaggi di commit da tutti gli executor e li passa a commit() se tutte le attività hanno esito positivo o a abort() se un'attività ha esito negativo.
  • Se un'attività di scrittura non riesce, il messaggio di commit sarà None incluso nell'elenco passato a commit() o abort().
  • batchId identifica in modo univoco ogni microbatch e incrementa di 1 con ogni microbatch elaborato.

Examples

Implementare un writer di flusso basato su freccia che conta le righe per microbatch:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        total_rows = 0
        for batch in iterator:
            total_rows += len(batch)
        return MyCommitMessage(num_rows=total_rows)

    def commit(self, messages, batchId):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed batch {batchId} with {total} rows")

    def abort(self, messages, batchId):
        print(f"Batch {batchId} failed, performing cleanup")