Delen via


DataSourceReader

Een basisklasse voor lezers van gegevensbronnen.

Lezers van gegevensbronnen zijn verantwoordelijk voor het uitvoeren van gegevens uit een gegevensbron. Implementeer deze klasse en retourneer een exemplaar van DataSource.reader() waaruit een gegevensbron kan worden gelezen.

Syntaxis

from pyspark.sql.datasource import DataSourceReader

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        ...

Methods

Methode Beschrijving
pushFilters(filters) Aangeroepen met de lijst met filters die naar de gegevensbron kunnen worden gepusht. Hiermee wordt een itereerbaar filter geretourneerd dat nog steeds door Spark moet worden geëvalueerd. Retourneert standaard alle filters, wat aangeeft dat er geen filters omlaag worden gepusht. pushFilters() mag worden gewijzigd self. Het object moet na wijziging picklable blijven. Wijzigingen die self zichtbaar zijn voor partitions() en read().
partitions() Retourneert een reeks InputPartition objecten die gegevens splitsen die in parallelle taken worden gelezen. Retourneert standaard één partitie. Overschrijven voor betere prestaties bij het lezen van grote gegevenssets. Alle partitiewaarden die worden geretourneerd door partitions() moeten picklable objecten zijn.
read(partition) Genereert gegevens voor een bepaalde partitie en retourneert een iterator van tuples, rijen of PyArrow-objecten RecordBatch . Elke tuple of rij wordt geconverteerd naar een rij in het uiteindelijke DataFrame. Deze methode is abstract en moet worden geïmplementeerd.

Examples

Implementeer een eenvoudige lezer die rijen retourneert uit een lijst met partities:

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSourceReader(DataSourceReader):
    def partitions(self):
        return [InputPartition(1), InputPartition(2), InputPartition(3)]

    def read(self, partition):
        yield (partition.value, 0)
        yield (partition.value, 1)

Rijen retourneren met Behulp van PyArrow RecordBatch:

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        import pyarrow as pa
        data = {
            "partition": [partition.value] * 2,
            "value": [0, 1]
        }
        table = pa.Table.from_pydict(data)
        for batch in table.to_batches():
            yield batch

Filterpushdown implementeren ter ondersteuning van EqualTo filters:

from pyspark.sql.datasource import DataSourceReader, EqualTo

class MyDataSourceReader(DataSourceReader):
    def __init__(self):
        self.filters = []

    def pushFilters(self, filters):
        for f in filters:
            if isinstance(f, EqualTo):
                self.filters.append(f)
            else:
                yield f

    def read(self, partition):
        ...