Freigeben über


DataSourceArrowWriter

Eine Basisklasse für Datenquellenautoren, die Daten mithilfe von PyArrow verarbeiten RecordBatch.

Anders als DataSourceWriterbei einem Iterator von Spark-Objekten Row ist diese Klasse beim Schreiben von Daten für das Pfeilformat optimiert. Sie kann eine bessere Leistung bieten, wenn sie mit Systemen oder Bibliotheken interfaciert wird, die Pfeil nativ unterstützen. Implementieren Sie diese Klasse, und geben Sie eine Instanz zurück DataSource.writer() , um eine Datenquelle mithilfe von Arrow schreibbar zu machen.

Syntax

from pyspark.sql.datasource import DataSourceArrowWriter

class MyDataSourceArrowWriter(DataSourceArrowWriter):
    def write(self, iterator):
        ...

Methodik

Methode Beschreibung
write(iterator) Schreibt einen Iterator von PyArrow RecordBatch -Objekten in die Spüle. Wird einmal für jeden Executor aufgerufen. Gibt eine WriterCommitMessage, oder None wenn keine Commit-Nachricht vorhanden ist. Diese Methode ist abstrakt und muss implementiert werden.
commit(messages) Führt einen Commit für den Schreibauftrag mithilfe einer Liste von Commitnachrichten durch, die von allen Executoren gesammelt wurden. Wird auf dem Treiber aufgerufen, wenn alle Aufgaben erfolgreich ausgeführt werden. Wird von DataSourceWriter geerbt.
abort(messages) Bricht den Schreibauftrag mithilfe einer Liste von Commit-Nachrichten ab, die von allen Executoren gesammelt wurden. Wird auf dem Treiber aufgerufen, wenn mindestens eine Aufgabe fehlgeschlagen ist. Wird von DataSourceWriter geerbt.

Hinweise

  • Der Treiber sammelt Commit-Nachrichten von allen Executoren und übergibt sie an commit() , wenn alle Aufgaben erfolgreich sind oder abort() wenn eine Aufgabe fehlschlägt.
  • Wenn eine Schreibaufgabe fehlschlägt, befindet None sich die Commit-Nachricht in der Liste, die an commit() oder abort().

Beispiele

Implementieren Sie einen Pfeilbasierten Writer, der Zeilen über alle Batches zählt:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceArrowWriter(DataSourceArrowWriter):
    def write(self, iterator):
        total_rows = 0
        for batch in iterator:
            total_rows += len(batch)
        return MyCommitMessage(num_rows=total_rows)

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")