Partager via


DataSourceReader

Classe de base pour les lecteurs de source de données.

Les lecteurs de sources de données sont responsables de la sortie des données d’une source de données. Implémentez cette classe et retournez une instance pour rendre une source de DataSource.reader() données lisible.

Syntaxe

from pyspark.sql.datasource import DataSourceReader

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        ...

Méthodes

Méthode Description
pushFilters(filters) Appelé avec la liste des filtres qui peuvent être poussés vers la source de données. Retourne une itérable de filtres qui doivent toujours être évalués par Spark. Par défaut, retourne tous les filtres, indiquant qu’aucun filtre n’est poussé vers le bas. pushFilters() est autorisé à modifier self. L’objet doit rester picklable après modification. Les modifications à self apporter sont visibles et partitions()read().
partitions() Retourne une séquence d’objets InputPartition qui fractionne les données lues en tâches parallèles. Par défaut, retourne une partition unique. Remplacez les performances lors de la lecture de jeux de données volumineux. Toutes les valeurs de partition retournées par partitions() doivent être des objets sélectionnables.
read(partition) Génère des données pour une partition donnée et retourne un itérateur d’objets tuples, lignes ou PyArrow RecordBatch . Chaque tuple ou ligne est converti en ligne dans le DataFrame final. Cette méthode est abstraite et doit être implémentée.

Exemples

Implémentez un lecteur de base qui retourne des lignes à partir d’une liste de partitions :

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSourceReader(DataSourceReader):
    def partitions(self):
        return [InputPartition(1), InputPartition(2), InputPartition(3)]

    def read(self, partition):
        yield (partition.value, 0)
        yield (partition.value, 1)

Retourner des lignes à l’aide de PyArrow RecordBatch:

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        import pyarrow as pa
        data = {
            "partition": [partition.value] * 2,
            "value": [0, 1]
        }
        table = pa.Table.from_pydict(data)
        for batch in table.to_batches():
            yield batch

Implémentez le pushdown de filtre pour prendre en charge EqualTo les filtres :

from pyspark.sql.datasource import DataSourceReader, EqualTo

class MyDataSourceReader(DataSourceReader):
    def __init__(self):
        self.filters = []

    def pushFilters(self, filters):
        for f in filters:
            if isinstance(f, EqualTo):
                self.filters.append(f)
            else:
                yield f

    def read(self, partition):
        ...