Freigeben über


DataSourceReader

Eine Basisklasse für Datenquellenleser.

Datenquellenleser sind für die Ausgabe von Daten aus einer Datenquelle verantwortlich. Implementieren Sie diese Klasse, und geben Sie eine Instanz zurück DataSource.reader() , um eine Datenquelle lesbar zu machen.

Syntax

from pyspark.sql.datasource import DataSourceReader

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        ...

Methodik

Methode Beschreibung
pushFilters(filters) Wird mit der Liste der Filter aufgerufen, die an die Datenquelle übertragen werden können. Gibt eine Iterable von Filtern zurück, die noch von Spark ausgewertet werden müssen. Gibt standardmäßig alle Filter zurück, die angeben, dass keine Filter nach unten verschoben werden. pushFilters() kann geändert werden self. Das Objekt muss nach änderungsfähig bleiben. Änderungen, an denen self sichtbar partitions() sind, und read().
partitions() Gibt eine Abfolge von Objekten zurück, die das Lesen von InputPartition Daten in parallele Vorgänge aufteilen. Gibt standardmäßig eine einzelne Partition zurück. Überschreiben Sie beim Lesen großer Datasets eine bessere Leistung. Alle von ihnen zurückgegebenen partitions() Partitionswerte müssen auswählbare Objekte sein.
read(partition) Generiert Daten für eine bestimmte Partition und gibt einen Iterator von Tupeln, Zeilen oder PyArrow-Objekten RecordBatch zurück. Jedes Tupel oder jede Zeile wird in eine Zeile im endgültigen DataFrame konvertiert. Diese Methode ist abstrakt und muss implementiert werden.

Beispiele

Implementieren Sie einen einfachen Reader, der Zeilen aus einer Liste von Partitionen zurückgibt:

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSourceReader(DataSourceReader):
    def partitions(self):
        return [InputPartition(1), InputPartition(2), InputPartition(3)]

    def read(self, partition):
        yield (partition.value, 0)
        yield (partition.value, 1)

Zurückgeben von Zeilen mithilfe von PyArrow RecordBatch:

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        import pyarrow as pa
        data = {
            "partition": [partition.value] * 2,
            "value": [0, 1]
        }
        table = pa.Table.from_pydict(data)
        for batch in table.to_batches():
            yield batch

Implementieren Sie Filter-Pushdown, um Filter zu unterstützen EqualTo :

from pyspark.sql.datasource import DataSourceReader, EqualTo

class MyDataSourceReader(DataSourceReader):
    def __init__(self):
        self.filters = []

    def pushFilters(self, filters):
        for f in filters:
            if isinstance(f, EqualTo):
                self.filters.append(f)
            else:
                yield f

    def read(self, partition):
        ...