Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Clase base para lectores de orígenes de datos.
Los lectores de orígenes de datos son responsables de generar datos de un origen de datos. Implemente esta clase y devuelva una instancia de para que un origen de DataSource.reader() datos sea legible.
Sintaxis
from pyspark.sql.datasource import DataSourceReader
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
...
Methods
| Método | Descripción |
|---|---|
pushFilters(filters) |
Se llama con la lista de filtros que se pueden insertar en el origen de datos. Devuelve una iteración de filtros que Spark todavía debe evaluar. De forma predeterminada, devuelve todos los filtros, lo que indica que no se inserta ningún filtro.
pushFilters() se permite modificar self. El objeto debe permanecer seleccionable después de la modificación. Los cambios en self son visibles para partitions() y read(). |
partitions() |
Devuelve una secuencia de objetos que dividen la lectura de InputPartition datos en tareas paralelas. De forma predeterminada, devuelve una sola partición. Invalide para mejorar el rendimiento al leer grandes conjuntos de datos. Todos los valores de partición devueltos por partitions() deben ser objetos seleccionables. |
read(partition) |
Genera datos para una partición determinada y devuelve un iterador de tuplas, filas o objetos PyArrow RecordBatch . Cada tupla o fila se convierte en una fila en el dataframe final. Este método es abstracto y debe implementarse. |
Ejemplos
Implemente un lector básico que devuelva filas de una lista de particiones:
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
class MyDataSourceReader(DataSourceReader):
def partitions(self):
return [InputPartition(1), InputPartition(2), InputPartition(3)]
def read(self, partition):
yield (partition.value, 0)
yield (partition.value, 1)
Devolver filas mediante PyArrow RecordBatch:
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
import pyarrow as pa
data = {
"partition": [partition.value] * 2,
"value": [0, 1]
}
table = pa.Table.from_pydict(data)
for batch in table.to_batches():
yield batch
Implemente la inserción de filtros para admitir EqualTo filtros:
from pyspark.sql.datasource import DataSourceReader, EqualTo
class MyDataSourceReader(DataSourceReader):
def __init__(self):
self.filters = []
def pushFilters(self, filters):
for f in filters:
if isinstance(f, EqualTo):
self.filters.append(f)
else:
yield f
def read(self, partition):
...