Share via


DataSourceArrowWriter

Een basisklasse voor schrijvers van gegevensbronnen die gegevens verwerken met behulp van PyArrow RecordBatch.

In tegenstelling tot DataSourceWriter, wat werkt met een iterator van Spark-objecten Row , is deze klasse geoptimaliseerd voor de indeling Pijl bij het schrijven van gegevens. Het kan betere prestaties bieden bij communicatie met systemen of bibliotheken die systeemeigen ondersteuning bieden voor Arrow. Implementeer deze klasse en retourneer een exemplaar van DataSource.writer() waaruit een gegevensbron schrijfbaar is met behulp van Pijl.

Syntaxis

from pyspark.sql.datasource import DataSourceArrowWriter

class MyDataSourceArrowWriter(DataSourceArrowWriter):
    def write(self, iterator):
        ...

Methods

Methode Beschrijving
write(iterator) Hiermee schrijft u een iterator van PyArrow-objecten RecordBatch naar de sink. Wordt eenmaal aangeroepen op elke uitvoerder. Retourneert een WriterCommitMessage, of None als er geen doorvoerbericht is. Deze methode is abstract en moet worden geïmplementeerd.
commit(messages) Voert de schrijftaak door met behulp van een lijst met doorvoerberichten die zijn verzameld van alle uitvoerders. Aangeroepen op het stuurprogramma wanneer alle taken zijn uitgevoerd. Overgenomen van DataSourceWriter.
abort(messages) Hiermee wordt de schrijftaak afgebroken met behulp van een lijst met doorvoerberichten die zijn verzameld van alle uitvoerders. Aangeroepen op het stuurprogramma wanneer een of meer taken zijn mislukt. Overgenomen van DataSourceWriter.

Aantekeningen

  • Het stuurprogramma verzamelt doorvoerberichten van alle uitvoerders en geeft deze door aan commit() als alle taken slagen of aan abort() of een taak mislukt.
  • Als een schrijftaak mislukt, wordt het doorvoerbericht weergegeven in de lijst die wordt None doorgegeven aan commit() of abort().

Examples

Implementeer een op pijl gebaseerde schrijver waarmee rijen in alle batches worden geteld:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceArrowWriter(DataSourceArrowWriter):
    def write(self, iterator):
        total_rows = 0
        for batch in iterator:
            total_rows += len(batch)
        return MyCommitMessage(num_rows=total_rows)

    def commit(self, messages):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed {total} rows")

    def abort(self, messages):
        print("Write job failed, performing cleanup")