Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Classe de base pour les lecteurs de source de données.
Les lecteurs de sources de données sont responsables de la sortie des données d’une source de données. Implémentez cette classe et retournez une instance pour rendre une source de DataSource.reader() données lisible.
Syntaxe
from pyspark.sql.datasource import DataSourceReader
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
...
Méthodes
| Méthode | Description |
|---|---|
pushFilters(filters) |
Appelé avec la liste des filtres qui peuvent être poussés vers la source de données. Retourne une itérable de filtres qui doivent toujours être évalués par Spark. Par défaut, retourne tous les filtres, indiquant qu’aucun filtre n’est poussé vers le bas.
pushFilters() est autorisé à modifier self. L’objet doit rester picklable après modification. Les modifications à self apporter sont visibles et partitions()read(). |
partitions() |
Retourne une séquence d’objets InputPartition qui fractionne les données lues en tâches parallèles. Par défaut, retourne une partition unique. Remplacez les performances lors de la lecture de jeux de données volumineux. Toutes les valeurs de partition retournées par partitions() doivent être des objets sélectionnables. |
read(partition) |
Génère des données pour une partition donnée et retourne un itérateur d’objets tuples, lignes ou PyArrow RecordBatch . Chaque tuple ou ligne est converti en ligne dans le DataFrame final. Cette méthode est abstraite et doit être implémentée. |
Exemples
Implémentez un lecteur de base qui retourne des lignes à partir d’une liste de partitions :
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
class MyDataSourceReader(DataSourceReader):
def partitions(self):
return [InputPartition(1), InputPartition(2), InputPartition(3)]
def read(self, partition):
yield (partition.value, 0)
yield (partition.value, 1)
Retourner des lignes à l’aide de PyArrow RecordBatch:
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
import pyarrow as pa
data = {
"partition": [partition.value] * 2,
"value": [0, 1]
}
table = pa.Table.from_pydict(data)
for batch in table.to_batches():
yield batch
Implémentez le pushdown de filtre pour prendre en charge EqualTo les filtres :
from pyspark.sql.datasource import DataSourceReader, EqualTo
class MyDataSourceReader(DataSourceReader):
def __init__(self):
self.filters = []
def pushFilters(self, filters):
for f in filters:
if isinstance(f, EqualTo):
self.filters.append(f)
else:
yield f
def read(self, partition):
...