Freigeben über


DataSourceStreamArrowWriter

Eine Basisklasse für Datenstromautoren, die Daten mithilfe von PyArrow verarbeiten RecordBatch.

Im Gegensatz dazu DataSourceStreamWriter, was mit einem Iterator von Spark-Objekten Row funktioniert, ist diese Klasse beim Schreiben von Streamingdaten für das Arrow-Format optimiert. Sie kann eine bessere Leistung bieten, wenn sie mit Systemen oder Bibliotheken interfaciert wird, die Arrow für Streaming-Anwendungsfälle nativ unterstützen. Implementieren Sie diese Klasse, und geben Sie eine Instanz zurück DataSource.streamWriter() , um eine Datenquelle mit Pfeil als Streamingsenke schreibbar zu machen.

Syntax

from pyspark.sql.datasource import DataSourceStreamArrowWriter

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        ...

Methodik

Methode Beschreibung
write(iterator) Schreibt einen Iterator von PyArrow-Objekten RecordBatch in die Streaming-Spüle. Wird einmal pro Mikrobatch für Ausführende aufgerufen. Gibt eine WriterCommitMessage, oder None wenn keine Commit-Nachricht vorhanden ist. Diese Methode ist abstrakt und muss implementiert werden.
commit(messages, batchId) Führt einen Commit für den Mikrobatch mithilfe einer Liste von Commitnachrichten durch, die von allen Executoren gesammelt wurden. Wird auf dem Treiber aufgerufen, wenn alle Aufgaben im Mikrobatch erfolgreich ausgeführt werden. Wird von DataSourceStreamWriter geerbt.
abort(messages, batchId) Bricht die Mikrobatch mithilfe einer Liste der commit-Nachrichten ab, die von allen Executoren gesammelt wurden. Wird auf dem Treiber aufgerufen, wenn mindestens eine Aufgabe im Mikrobatch fehlgeschlagen ist. Wird von DataSourceStreamWriter geerbt.

Hinweise

  • Der Treiber sammelt Commit-Nachrichten von allen Executoren und übergibt sie an commit() , wenn alle Aufgaben erfolgreich sind oder abort() wenn eine Aufgabe fehlschlägt.
  • Wenn eine Schreibaufgabe fehlschlägt, befindet None sich die Commit-Nachricht in der Liste, die an commit() oder abort().
  • batchId identifiziert jedes Mikrobatch und Schritt um 1, wobei jeder Mikrobatch verarbeitet wird.

Beispiele

Implementieren Sie einen Pfeilbasierten Stream writer, der Zeilen pro Mikrobatch zählt:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        total_rows = 0
        for batch in iterator:
            total_rows += len(batch)
        return MyCommitMessage(num_rows=total_rows)

    def commit(self, messages, batchId):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed batch {batchId} with {total} rows")

    def abort(self, messages, batchId):
        print(f"Batch {batchId} failed, performing cleanup")