Share via


DataSourceStreamArrowWriter

Een basisklasse voor schrijvers van gegevensstromen die gegevens verwerken met behulp van PyArrow RecordBatch.

In tegenstelling tot DataSourceStreamWriter, wat werkt met een iterator van Spark-objecten Row , is deze klasse geoptimaliseerd voor de indeling Pijl bij het schrijven van streaminggegevens. Het kan betere prestaties bieden bij communicatie met systemen of bibliotheken die systeemeigen ondersteuning bieden voor Arrow voor gebruiksvoorbeelden voor streaming. Implementeer deze klasse en retourneer een exemplaar van DataSource.streamWriter() waaruit een gegevensbron kan worden geschreven als een streaming-sink met behulp van Arrow.

Syntaxis

from pyspark.sql.datasource import DataSourceStreamArrowWriter

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        ...

Methods

Methode Beschrijving
write(iterator) Hiermee schrijft u een iterator van PyArrow-objecten RecordBatch naar de streaming-sink. Aangeroepen op uitvoerders eenmaal per microbatch. Retourneert een WriterCommitMessage, of None als er geen doorvoerbericht is. Deze methode is abstract en moet worden geïmplementeerd.
commit(messages, batchId) Voert de microbatch door met behulp van een lijst met doorvoerberichten die zijn verzameld van alle uitvoerders. Aangeroepen op het stuurprogramma wanneer alle taken in de microbatch worden uitgevoerd. Overgenomen van DataSourceStreamWriter.
abort(messages, batchId) Hiermee wordt de microbatch afgebroken met behulp van een lijst met doorvoerberichten die zijn verzameld van alle uitvoerders. Aangeroepen op het stuurprogramma wanneer een of meer taken in de microbatch zijn mislukt. Overgenomen van DataSourceStreamWriter.

Aantekeningen

  • Het stuurprogramma verzamelt doorvoerberichten van alle uitvoerders en geeft deze door aan commit() als alle taken slagen of aan abort() of een taak mislukt.
  • Als een schrijftaak mislukt, wordt het doorvoerbericht weergegeven in de lijst die wordt None doorgegeven aan commit() of abort().
  • batchId identificeert elke microbatch en incrementeert met 1 bij elke verwerkte microbatch.

Examples

Implementeer een stroomschrijver op basis van pijlen waarmee rijen per microbatch worden geteld:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        total_rows = 0
        for batch in iterator:
            total_rows += len(batch)
        return MyCommitMessage(num_rows=total_rows)

    def commit(self, messages, batchId):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed batch {batchId} with {total} rows")

    def abort(self, messages, batchId):
        print(f"Batch {batchId} failed, performing cleanup")