Udostępnij za pośrednictwem


DataSourceStreamWriter

Klasa podstawowa dla składników zapisywania strumienia danych.

Autorzy strumieni danych są odpowiedzialni za zapisywanie danych w ujściu przesyłania strumieniowego. Zaimplementuj tę klasę i zwróć wystąpienie z DataSource.streamWriter() , aby źródło danych było zapisywalne jako ujście przesyłania strumieniowego. write() Jest wywoływany na modułach wykonawczych dla każdego mikrobajta i commit()abort() jest wywoływany na sterowniku po zakończeniu wszystkich zadań w mikrobajtach.

Składnia

from pyspark.sql.datasource import DataSourceStreamWriter

class MyDataSourceStreamWriter(DataSourceStreamWriter):
    def write(self, iterator):
        ...

Methods

Metoda Opis
write(iterator) Zapisuje dane w ujściu przesyłania strumieniowego. Wywoływane na moduły wykonawcze raz na mikrobajt. Akceptuje iterator Row obiektów i zwraca WriterCommitMessagewartość , lub None jeśli nie ma komunikatu zatwierdzenia. Ta metoda jest abstrakcyjna i musi zostać zaimplementowana.
commit(messages, batchId) Zatwierdza mikrobajt przy użyciu listy komunikatów zatwierdzenia zebranych ze wszystkich funkcji wykonawczych. Wywołana na sterowniku, gdy wszystkie zadania w mikrobajtach są uruchamiane pomyślnie.
abort(messages, batchId) Przerywa mikrobajt przy użyciu listy komunikatów zatwierdzenia zebranych ze wszystkich funkcji wykonawczych. Wywoływane na sterowniku, gdy co najmniej jedno zadanie w mikrobajtach nie powiodło się.

Notatki

  • Sterownik zbiera komunikaty zatwierdzenia ze wszystkich funkcji wykonawczych i przekazuje je, commit() jeśli wszystkie zadania kończą się powodzeniem lub abort() jeśli jakiekolwiek zadanie zakończy się niepowodzeniem.
  • Jeśli zadanie zapisu zakończy się niepowodzeniem, jego komunikat zatwierdzenia będzie znajdować się None na liście przekazanej do commit() lub abort().
  • batchId unikatowo identyfikuje każdy mikrobajt i zwiększa się o 1 przy każdym przetworzonym mikrobajtach.

Examples

Zaimplementuj moduł zapisywania strumienia, który dołącza wiersze do pliku:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceStreamWriter(DataSourceStreamWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "a") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages, batchId):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed batch {batchId} with {total} rows")

    def abort(self, messages, batchId):
        print(f"Batch {batchId} failed, performing cleanup")