Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Classe di base per i writer di flussi di dati.
I writer di flussi di dati sono responsabili della scrittura di dati in un sink di streaming. Implementare questa classe e restituire un'istanza da DataSource.streamWriter() per rendere scrivibile un'origine dati come sink di streaming.
write() viene chiamato su executor per ogni microbatch e commit() viene abort() chiamato sul driver dopo il completamento di tutte le attività nel microbatch.
Sintassi
from pyspark.sql.datasource import DataSourceStreamWriter
class MyDataSourceStreamWriter(DataSourceStreamWriter):
def write(self, iterator):
...
Methods
| metodo | Descrizione |
|---|---|
write(iterator) |
Scrive i dati nel sink di streaming. Chiamato su executor una volta per microbatch. Accetta un iteratore di Row oggetti e restituisce un WriterCommitMessageoggetto o None se non è presente alcun messaggio di commit. Questo metodo è astratto e deve essere implementato. |
commit(messages, batchId) |
Esegue il commit del microbatch usando un elenco di messaggi di commit raccolti da tutti gli executor. Richiamato sul driver quando tutte le attività nel microbatch vengono eseguite correttamente. |
abort(messages, batchId) |
Interrompe il microbatch usando un elenco di messaggi di commit raccolti da tutti gli executor. Richiamato sul driver quando una o più attività nel microbatch non sono riuscite. |
Note
- Il driver raccoglie i messaggi di commit da tutti gli executor e li passa a
commit()se tutte le attività hanno esito positivo o aabort()se un'attività ha esito negativo. - Se un'attività di scrittura non riesce, il messaggio di commit sarà
Noneincluso nell'elenco passato acommit()oabort(). -
batchIdidentifica in modo univoco ogni microbatch e incrementa di 1 con ogni microbatch elaborato.
Examples
Implementare un writer di flusso che accoda righe a un file:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.path = options.get("path")
def write(self, iterator):
rows = list(iterator)
with open(self.path, "a") as f:
for row in rows:
f.write(str(row) + "\n")
return MyCommitMessage(num_rows=len(rows))
def commit(self, messages, batchId):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed batch {batchId} with {total} rows")
def abort(self, messages, batchId):
print(f"Batch {batchId} failed, performing cleanup")