Partekatu honen bidez:


DataSourceReader

Clase base para lectores de orígenes de datos.

Los lectores de orígenes de datos son responsables de generar datos de un origen de datos. Implemente esta clase y devuelva una instancia de para que un origen de DataSource.reader() datos sea legible.

Sintaxis

from pyspark.sql.datasource import DataSourceReader

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        ...

Methods

Método Descripción
pushFilters(filters) Se llama con la lista de filtros que se pueden insertar en el origen de datos. Devuelve una iteración de filtros que Spark todavía debe evaluar. De forma predeterminada, devuelve todos los filtros, lo que indica que no se inserta ningún filtro. pushFilters() se permite modificar self. El objeto debe permanecer seleccionable después de la modificación. Los cambios en self son visibles para partitions() y read().
partitions() Devuelve una secuencia de objetos que dividen la lectura de InputPartition datos en tareas paralelas. De forma predeterminada, devuelve una sola partición. Invalide para mejorar el rendimiento al leer grandes conjuntos de datos. Todos los valores de partición devueltos por partitions() deben ser objetos seleccionables.
read(partition) Genera datos para una partición determinada y devuelve un iterador de tuplas, filas o objetos PyArrow RecordBatch . Cada tupla o fila se convierte en una fila en el dataframe final. Este método es abstracto y debe implementarse.

Ejemplos

Implemente un lector básico que devuelva filas de una lista de particiones:

from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition

class MyDataSourceReader(DataSourceReader):
    def partitions(self):
        return [InputPartition(1), InputPartition(2), InputPartition(3)]

    def read(self, partition):
        yield (partition.value, 0)
        yield (partition.value, 1)

Devolver filas mediante PyArrow RecordBatch:

class MyDataSourceReader(DataSourceReader):
    def read(self, partition):
        import pyarrow as pa
        data = {
            "partition": [partition.value] * 2,
            "value": [0, 1]
        }
        table = pa.Table.from_pydict(data)
        for batch in table.to_batches():
            yield batch

Implemente la inserción de filtros para admitir EqualTo filtros:

from pyspark.sql.datasource import DataSourceReader, EqualTo

class MyDataSourceReader(DataSourceReader):
    def __init__(self):
        self.filters = []

    def pushFilters(self, filters):
        for f in filters:
            if isinstance(f, EqualTo):
                self.filters.append(f)
            else:
                yield f

    def read(self, partition):
        ...