Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of mappen te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen om mappen te wijzigen.
Een basisklasse voor schrijvers van gegevensstromen die gegevens verwerken met behulp van PyArrow RecordBatch.
In tegenstelling tot DataSourceStreamWriter, wat werkt met een iterator van Spark-objecten Row , is deze klasse geoptimaliseerd voor de indeling Pijl bij het schrijven van streaminggegevens. Het kan betere prestaties bieden bij communicatie met systemen of bibliotheken die systeemeigen ondersteuning bieden voor Arrow voor gebruiksvoorbeelden voor streaming. Implementeer deze klasse en retourneer een exemplaar van DataSource.streamWriter() waaruit een gegevensbron kan worden geschreven als een streaming-sink met behulp van Arrow.
Syntaxis
from pyspark.sql.datasource import DataSourceStreamArrowWriter
class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
def write(self, iterator):
...
Methods
| Methode | Beschrijving |
|---|---|
write(iterator) |
Hiermee schrijft u een iterator van PyArrow-objecten RecordBatch naar de streaming-sink. Aangeroepen op uitvoerders eenmaal per microbatch. Retourneert een WriterCommitMessage, of None als er geen doorvoerbericht is. Deze methode is abstract en moet worden geïmplementeerd. |
commit(messages, batchId) |
Voert de microbatch door met behulp van een lijst met doorvoerberichten die zijn verzameld van alle uitvoerders. Aangeroepen op het stuurprogramma wanneer alle taken in de microbatch worden uitgevoerd. Overgenomen van DataSourceStreamWriter. |
abort(messages, batchId) |
Hiermee wordt de microbatch afgebroken met behulp van een lijst met doorvoerberichten die zijn verzameld van alle uitvoerders. Aangeroepen op het stuurprogramma wanneer een of meer taken in de microbatch zijn mislukt. Overgenomen van DataSourceStreamWriter. |
Aantekeningen
- Het stuurprogramma verzamelt doorvoerberichten van alle uitvoerders en geeft deze door aan
commit()als alle taken slagen of aanabort()of een taak mislukt. - Als een schrijftaak mislukt, wordt het doorvoerbericht weergegeven in de lijst die wordt
Nonedoorgegeven aancommit()ofabort(). -
batchIdidentificeert elke microbatch en incrementeert met 1 bij elke verwerkte microbatch.
Examples
Implementeer een stroomschrijver op basis van pijlen waarmee rijen per microbatch worden geteld:
from dataclasses import dataclass
from pyspark.sql.datasource import DataSource, DataSourceStreamArrowWriter, WriterCommitMessage
@dataclass
class MyCommitMessage(WriterCommitMessage):
num_rows: int
class MyDataSourceStreamArrowWriter(DataSourceStreamArrowWriter):
def write(self, iterator):
total_rows = 0
for batch in iterator:
total_rows += len(batch)
return MyCommitMessage(num_rows=total_rows)
def commit(self, messages, batchId):
total = sum(m.num_rows for m in messages if m is not None)
print(f"Committed batch {batchId} with {total} rows")
def abort(self, messages, batchId):
print(f"Batch {batchId} failed, performing cleanup")