Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Eine Basisklasse für Datenstromautoren.
Datenstromautoren sind dafür verantwortlich, Daten in eine Streaming-Spüle zu schreiben. Implementieren Sie diese Klasse, und geben Sie eine Instanz zurück DataSource.streamWriter() , um eine Datenquelle als Streaming-Sink schreibbar zu machen.
write() wird für jeden Mikrobatch aufgerufen und commit()abort() oder nach Abschluss aller Aufgaben im Mikrobatch an den Treiber angerufen.
Syntax
from pyspark.sql.datasource import DataSourceStreamWriter
class MyDataSourceStreamWriter(DataSourceStreamWriter):
def write(self, iterator):
...
Methodik
| Methode | Beschreibung |
|---|---|
write(iterator) |
Schreibt Daten in die Streaming-Spüle. Wird einmal pro Mikrobatch für Ausführende aufgerufen. Akzeptiert einen Iterator von Row Objekten und gibt eine WriterCommitMessageOder None , wenn keine Commit-Nachricht vorhanden ist. Diese Methode ist abstrakt und muss implementiert werden. |
commit(messages, batchId) |
Führt einen Commit für den Mikrobatch mithilfe einer Liste von Commitnachrichten durch, die von allen Executoren gesammelt wurden. Wird auf dem Treiber aufgerufen, wenn alle Aufgaben im Mikrobatch erfolgreich ausgeführt werden. |
abort(messages, batchId) |
Bricht die Mikrobatch mithilfe einer Liste der commit-Nachrichten ab, die von allen Executoren gesammelt wurden. Wird auf dem Treiber aufgerufen, wenn mindestens eine Aufgabe im Mikrobatch fehlgeschlagen ist. |
Hinweise
- Der Treiber sammelt Commit-Nachrichten von allen Executoren und übergibt sie an
commit(), wenn alle Aufgaben erfolgreich sind oderabort()wenn eine Aufgabe fehlschlägt. - Wenn eine Schreibaufgabe fehlschlägt, befindet
Nonesich die Commit-Nachricht in der Liste, die ancommit()oderabort(). -
batchIdidentifiziert jedes Mikrobatch und Schritt um 1, wobei jeder Mikrobatch verarbeitet wird.
Beispiele
Implementieren Sie einen Stream Writer, der Zeilen an eine Datei ansetzt:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.path = options.get("path")
def write(self, iterator):
rows = list(iterator)
with open(self.path, "a") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))
def commit(self, messages, batchId):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed batch {batchId} with {total} rows")
def abort(self, messages, batchId):
print(f"Batch {batchId} failed, performing cleanup")