Freigeben über


DataSourceStreamReader

Eine Basisklasse für Streamingdatenquellenleser.

Datenquellendatenstromleser sind für die Ausgabe von Daten aus einer Streamingdatenquelle verantwortlich. Implementieren Sie diese Klasse, und geben Sie eine Instanz zurück DataSource.streamReader() , um eine Datenquelle als Streamingquelle lesbar zu machen.

Syntax

from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        ...

    def partitions(self, start, end):
        ...

    def read(self, partition):
        ...

Methodik

Methode Beschreibung
initialOffset() Gibt den anfänglichen Offset der Streamingdatenquelle als Diktat zurück. Eine neue Streamingabfrage beginnt mit dem Lesen aus diesem Offset. Neu gestartete Abfragen werden stattdessen aus dem überprüften Offset fortgesetzt.
partitions(start, end) Gibt eine Abfolge von InputPartition Objekten zurück, die die Daten zwischen start und end offsets darstellen. Gibt eine leere Sequenz zurück, wenn start dies gleich ist end.
read(partition) Generiert Daten für eine bestimmte Partition und gibt einen Iterator von Tupeln, Zeilen oder PyArrow-Objekten RecordBatch zurück. Jedes Tupel oder jede Zeile wird in eine Zeile im endgültigen DataFrame konvertiert. Diese Methode ist abstrakt und muss implementiert werden.
commit(end) Informiert die Quelle, dass Spark die Verarbeitung aller Daten für Offsets abgeschlossen hat, die kleiner oder gleich sind end. Spark fordert nur Offsets an, die höher sind als end in der Zukunft.
stop() Beendet die Quelle und gibt alle ressourcen frei, die sie zugeordnet hat. Wird aufgerufen, wenn die Streamingabfrage beendet wird.

Hinweise

  • read() ist statisch und zustandslos. Greifen Sie nicht auf änderbare Klassenmember zu oder behalten Sie den Speicherstatus zwischen verschiedenen Aufrufen von read().
  • Alle von ihnen zurückgegebenen partitions() Partitionswerte müssen auswählbare Objekte sein.
  • Offsets werden als Diktat oder rekursives Diktat dargestellt, dessen Schlüssel und Werte grundtyptyp sind: ganze Zahl, Zeichenfolge oder boolesche Werte.

Beispiele

Implementieren eines Streaminglesers, der aus einer Sequenz von indizierten Datensätzen liest:

from pyspark.sql.datasource import (
    DataSource,
    DataSourceStreamReader,
    InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        return {"index": 0}

    def latestOffset(self, start, limit):
        return {"index": start["index"] + 10}

    def partitions(self, start, end):
        return [
            InputPartition(i)
            for i in range(start["index"], end["index"])
        ]

    def read(self, partition):
        yield (partition.value, f"record-{partition.value}")

    def commit(self, end):
        print(f"Committed up to offset {end}")

    def stop(self):
        print("Stopping stream reader")