Condividi tramite


DataSourceStreamReader

Classe di base per i lettori dell'origine dati di streaming.

I lettori del flusso di origine dati sono responsabili dell'output dei dati da un'origine dati di streaming. Implementare questa classe e restituire un'istanza da DataSource.streamReader() per rendere leggibile un'origine dati come origine di streaming.

Sintassi

from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        ...

    def partitions(self, start, end):
        ...

    def read(self, partition):
        ...

Methods

metodo Descrizione
initialOffset() Restituisce l'offset iniziale dell'origine dati di streaming come elemento dict. Una nuova query di streaming inizia a leggere da questo offset. Le query riavviate riprendono dall'offset con checkpoint.
partitions(start, end) Restituisce una sequenza di InputPartition oggetti che rappresentano i dati tra start e end gli offset. Restituisce una sequenza vuota se start è uguale a end.
read(partition) Genera dati per una determinata partizione e restituisce un iteratore di tuple, righe o oggetti PyArrow RecordBatch . Ogni tupla o riga viene convertita in una riga nel dataframe finale. Questo metodo è astratto e deve essere implementato.
commit(end) Informa l'origine che Spark ha completato l'elaborazione di tutti i dati per gli offset minori o uguali a end. Spark richiederà solo offset maggiori di quelli end futuri.
stop() Arresta l'origine e libera tutte le risorse allocate. Richiamato quando la query di streaming termina.

Note

  • read() è statico e senza stato. Non accedere ai membri della classe modificabili o mantenere lo stato in memoria tra chiamate diverse di read().
  • Tutti i valori di partizione restituiti da partitions() devono essere oggetti selezionabili.
  • Gli offset sono rappresentati come dict o dict ricorsivo i cui valori e chiavi sono tipi primitivi: integer, string o boolean.

Examples

Implementare un lettore di streaming che legge da una sequenza di record indicizzati:

from pyspark.sql.datasource import (
    DataSource,
    DataSourceStreamReader,
    InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        return {"index": 0}

    def latestOffset(self, start, limit):
        return {"index": start["index"] + 10}

    def partitions(self, start, end):
        return [
            InputPartition(i)
            for i in range(start["index"], end["index"])
        ]

    def read(self, partition):
        yield (partition.value, f"record-{partition.value}")

    def commit(self, end):
        print(f"Committed up to offset {end}")

    def stop(self):
        print("Stopping stream reader")