Share via


DataSourceStreamReader

Een basisklasse voor lezers van streaminggegevensbronnen.

Gegevensbronstreamlezers zijn verantwoordelijk voor het uitvoeren van gegevens uit een streaminggegevensbron. Implementeer deze klasse en retourneer een exemplaar van DataSource.streamReader() waaruit een gegevensbron kan worden gelezen als een streamingbron.

Syntaxis

from pyspark.sql.datasource import DataSourceStreamReader

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        ...

    def partitions(self, start, end):
        ...

    def read(self, partition):
        ...

Methods

Methode Beschrijving
initialOffset() Retourneert de eerste verschuiving van de streaminggegevensbron als een dict. Een nieuwe streamingquery begint met lezen vanaf deze offset. Opnieuw gestarte query's worden hervat vanuit de controlepunt-offset.
partitions(start, end) Retourneert een reeks InputPartition objecten die de gegevens tussen start en end offsets vertegenwoordigen. Retourneert een lege reeks als start deze gelijk is endaan .
read(partition) Genereert gegevens voor een bepaalde partitie en retourneert een iterator van tuples, rijen of PyArrow-objecten RecordBatch . Elke tuple of rij wordt geconverteerd naar een rij in het uiteindelijke DataFrame. Deze methode is abstract en moet worden geïmplementeerd.
commit(end) Informeert de bron dat Spark alle gegevens heeft verwerkt voor offsets kleiner dan of gelijk aan end. Spark vraagt alleen offsets aan die groter zijn dan end in de toekomst.
stop() Hiermee stopt u de bron en worden alle resources vrijgemaakt die deze heeft toegewezen. Aangeroepen wanneer de streamingquery wordt beëindigd.

Aantekeningen

  • read() is statisch en staatloos. Geen toegang tot onveranderbare klasseleden of de status in het geheugen behouden tussen verschillende aanroepen van read().
  • Alle partitiewaarden die worden geretourneerd door partitions() moeten picklable objecten zijn.
  • Verschuivingen worden weergegeven als een dict of recursieve dict waarvan sleutels en waarden primitieve typen zijn: geheel getal, tekenreeks of booleaanse waarde.

Examples

Implementeer een streaminglezer die wordt gelezen uit een reeks geïndexeerde records:

from pyspark.sql.datasource import (
    DataSource,
    DataSourceStreamReader,
    InputPartition,
)

class MyDataSourceStreamReader(DataSourceStreamReader):
    def initialOffset(self):
        return {"index": 0}

    def latestOffset(self, start, limit):
        return {"index": start["index"] + 10}

    def partitions(self, start, end):
        return [
            InputPartition(i)
            for i in range(start["index"], end["index"])
        ]

    def read(self, partition):
        yield (partition.value, f"record-{partition.value}")

    def commit(self, end):
        print(f"Committed up to offset {end}")

    def stop(self):
        print("Stopping stream reader")