Dela via


DataSourceStreamArrowWriter

En basklass för dataströmsskrivare som bearbetar data med PyArrows RecordBatch.

Till skillnad från DataSourceStreamWriter, som fungerar med en iterator av Spark-objekt Row , optimeras den här klassen för pilformatet när strömmande data skrivs. Det kan ge bättre prestanda när du interagerar med system eller bibliotek som internt stöder Arrow för användningsfall för direktuppspelning. Implementera den här klassen och returnera en instans från DataSource.streamWriter() för att göra en datakälla skrivbar som en direktuppspelningsmottagare med hjälp av Arrow.

Syntax

from pyspark.sql.datasource import DataSourceStreamArrowWriter

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        ...

Methods

Metod Beskrivning
write(iterator) Skriver en iterator av PyArrow-objekt RecordBatch till strömningsmottagaren. Anropade utförare en gång per mikrobatch. Returnerar ett WriterCommitMessage, eller None om det inte finns något incheckningsmeddelande. Den här metoden är abstrakt och måste implementeras.
commit(messages, batchId) Genomför mikrobatchen med hjälp av en lista över incheckningsmeddelanden som samlats in från alla utförare. Anropas på drivrutinen när alla uppgifter i mikrobatchen körs. Ärvd från DataSourceStreamWriter.
abort(messages, batchId) Avbryter mikrobatchen med hjälp av en lista över incheckningsmeddelanden som samlats in från alla utförare. Anropas på drivrutinen när en eller flera uppgifter i mikrobatchen misslyckades. Ärvd från DataSourceStreamWriter.

Notes

  • Drivrutinen samlar in incheckningsmeddelanden från alla utförare och skickar dem till commit() om alla uppgifter lyckas, eller till abort() om någon uppgift misslyckas.
  • Om en skrivuppgift misslyckas finns dess incheckningsmeddelande i listan som skickas None till commit() eller abort().
  • batchId identifierar varje mikrobatch och ökar med 1 med varje bearbetad mikrobatch.

Exempel

Implementera en Pilbaserad strömskrivare som räknar rader per mikrobatch:

from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage

@dataclass
class MyCommitMessage(WriterCommitMessage):
    num_rows: int

class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
    def write(self, iterator):
        total_rows = 0
        for batch in iterator:
            total_rows += len(batch)
        return MyCommitMessage(num_rows=total_rows)

    def commit(self, messages, batchId):
        total = sum(m.num_rows for m in messages if m is not None)
        print(f"Committed batch {batchId} with {total} rows")

    def abort(self, messages, batchId):
        print(f"Batch {batchId} failed, performing cleanup")