Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of mappen te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen om mappen te wijzigen.
Een basisklasse voor schrijvers van gegevensbronnen die gegevens verwerken met behulp van PyArrow RecordBatch.
In tegenstelling tot DataSourceWriter, wat werkt met een iterator van Spark-objecten Row , is deze klasse geoptimaliseerd voor de indeling Pijl bij het schrijven van gegevens. Het kan betere prestaties bieden bij communicatie met systemen of bibliotheken die systeemeigen ondersteuning bieden voor Arrow. Implementeer deze klasse en retourneer een exemplaar van DataSource.writer() waaruit een gegevensbron schrijfbaar is met behulp van Pijl.
Syntaxis
from pyspark.sql.datasource import DataSourceArrowWriter
class MyDataSourceArrowWriter(DataSourceArrowWriter):
def write(self, iterator):
...
Methods
| Methode | Beschrijving |
|---|---|
write(iterator) |
Hiermee schrijft u een iterator van PyArrow-objecten RecordBatch naar de sink. Wordt eenmaal aangeroepen op elke uitvoerder. Retourneert een WriterCommitMessage, of None als er geen doorvoerbericht is. Deze methode is abstract en moet worden geïmplementeerd. |
commit(messages) |
Voert de schrijftaak door met behulp van een lijst met doorvoerberichten die zijn verzameld van alle uitvoerders. Aangeroepen op het stuurprogramma wanneer alle taken zijn uitgevoerd. Overgenomen van DataSourceWriter. |
abort(messages) |
Hiermee wordt de schrijftaak afgebroken met behulp van een lijst met doorvoerberichten die zijn verzameld van alle uitvoerders. Aangeroepen op het stuurprogramma wanneer een of meer taken zijn mislukt. Overgenomen van DataSourceWriter. |
Aantekeningen
- Het stuurprogramma verzamelt doorvoerberichten van alle uitvoerders en geeft deze door aan
commit()als alle taken slagen of aanabort()of een taak mislukt. - Als een schrijftaak mislukt, wordt het doorvoerbericht weergegeven in de lijst die wordt
Nonedoorgegeven aancommit()ofabort().
Examples
Implementeer een op pijl gebaseerde schrijver waarmee rijen in alle batches worden geteld:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceArrowWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceArrowWriter(DataSourceArrowWriter):
def write(self, iterator):
total_rows = 0
for batch in iterator:
total_rows += len(batch)
return MyCommitMessage(num_rows=total_rows)
def commit(self, messages):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed {total} rows")
def abort(self, messages):
print("Write job failed, performing cleanup")