Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Klasa podstawowa dla składników zapisywania strumienia danych.
Autorzy strumieni danych są odpowiedzialni za zapisywanie danych w ujściu przesyłania strumieniowego. Zaimplementuj tę klasę i zwróć wystąpienie z DataSource.streamWriter() , aby źródło danych było zapisywalne jako ujście przesyłania strumieniowego.
write() Jest wywoływany na modułach wykonawczych dla każdego mikrobajta i commit()abort() jest wywoływany na sterowniku po zakończeniu wszystkich zadań w mikrobajtach.
Składnia
from pyspark.sql.datasource import DataSourceStreamWriter
class MyDataSourceStreamWriter(DataSourceStreamWriter):
def write(self, iterator):
...
Methods
| Metoda | Opis |
|---|---|
write(iterator) |
Zapisuje dane w ujściu przesyłania strumieniowego. Wywoływane na moduły wykonawcze raz na mikrobajt. Akceptuje iterator Row obiektów i zwraca WriterCommitMessagewartość , lub None jeśli nie ma komunikatu zatwierdzenia. Ta metoda jest abstrakcyjna i musi zostać zaimplementowana. |
commit(messages, batchId) |
Zatwierdza mikrobajt przy użyciu listy komunikatów zatwierdzenia zebranych ze wszystkich funkcji wykonawczych. Wywołana na sterowniku, gdy wszystkie zadania w mikrobajtach są uruchamiane pomyślnie. |
abort(messages, batchId) |
Przerywa mikrobajt przy użyciu listy komunikatów zatwierdzenia zebranych ze wszystkich funkcji wykonawczych. Wywoływane na sterowniku, gdy co najmniej jedno zadanie w mikrobajtach nie powiodło się. |
Notatki
- Sterownik zbiera komunikaty zatwierdzenia ze wszystkich funkcji wykonawczych i przekazuje je,
commit()jeśli wszystkie zadania kończą się powodzeniem lubabort()jeśli jakiekolwiek zadanie zakończy się niepowodzeniem. - Jeśli zadanie zapisu zakończy się niepowodzeniem, jego komunikat zatwierdzenia będzie znajdować się
Nonena liście przekazanej docommit()lubabort(). -
batchIdunikatowo identyfikuje każdy mikrobajt i zwiększa się o 1 przy każdym przetworzonym mikrobajtach.
Examples
Zaimplementuj moduł zapisywania strumienia, który dołącza wiersze do pliku:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.path = options.get("path")
def write(self, iterator):
rows = list(iterator)
with open(self.path, "a") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))
def commit(self, messages, batchId):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed batch {batchId} with {total} rows")
def abort(self, messages, batchId):
print(f"Batch {batchId} failed, performing cleanup")