DataSourceWriter

Klasa bazowa dla składników zapisywania źródeł danych.

Autorzy źródeł danych są odpowiedzialni za zapisywanie danych w źródle danych. Zaimplementuj tę klasę i zwróć wystąpienie z DataSource.writer() , aby można było zapisywać źródło danych.

Dodano w środowisku Databricks Runtime 14.3 LTS

Składnia

from pyspark.sql.datasource import DataSourceWriter

class MyDataSourceWriter(DataSourceWriter):
    def write(self, iterator):
        ...

Methods

Metoda Opis
write(iterator) Zapisuje dane w źródle danych. Wywoływane raz dla każdego modułu wykonawczego. Akceptuje iterator Row obiektów i zwraca WriterCommitMessagewartość , lub None jeśli nie ma komunikatu zatwierdzenia. Ta metoda jest abstrakcyjna i musi zostać zaimplementowana.
commit(messages) Zatwierdza zadanie zapisywania przy użyciu listy komunikatów zatwierdzeń zebranych ze wszystkich funkcji wykonawczych. Wywoływane na sterowniku, gdy wszystkie zadania są uruchamiane pomyślnie.
abort(messages) Przerywa zadanie zapisywania przy użyciu listy komunikatów zatwierdzenia zebranych ze wszystkich funkcji wykonawczych. Wywoływane na sterowniku, gdy co najmniej jedno zadanie nie powiodło się.

Notatki

  • Sterownik zbiera komunikaty zatwierdzenia ze wszystkich funkcji wykonawczych i przekazuje je, commit() jeśli wszystkie zadania kończą się powodzeniem lub abort() jeśli jakiekolwiek zadanie zakończy się niepowodzeniem.
  • Jeśli zadanie zapisu zakończy się niepowodzeniem, jego komunikat zatwierdzenia będzie znajdować się None na liście przekazanej do commit() lub abort().

Examples

Zaimplementuj podstawowy moduł zapisywania, który zapisuje wiersze w pliku:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceWriter(DataSourceWriter):
    def __init__(self, options):
        self.path = options.get("path")

    def write(self, iterator):
        rows = list(iterator)
        with open(self.path, "w") as f:
            for row in rows:
                f.write(str(row) + "\n")
        return MyCommitMessage(num_rows=len(rows))

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")