Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Classe di base per i lettori dell'origine dati.
I lettori dell'origine dati sono responsabili dell'output dei dati da un'origine dati. Implementare questa classe e restituire un'istanza da DataSource.reader() per rendere leggibile un'origine dati.
Sintassi
from pyspark.sql.datasource import DataSourceReader
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
...
Methods
| metodo | Descrizione |
|---|---|
pushFilters(filters) |
Chiamato con l'elenco di filtri che possono essere inseriti nell'origine dati. Restituisce un'iterabile di filtri che devono comunque essere valutati da Spark. Per impostazione predefinita, restituisce tutti i filtri, a indicare che non viene eseguito il push dei filtri.
pushFilters() è autorizzato a modificare self. L'oggetto deve rimanere selezionabile dopo la modifica. Le modifiche apportate a self sono visibili a partitions() e read(). |
partitions() |
Restituisce una sequenza di InputPartition oggetti che suddivideno i dati in attività parallele. Per impostazione predefinita, restituisce una singola partizione. Eseguire l'override per ottenere prestazioni migliori durante la lettura di set di dati di grandi dimensioni. Tutti i valori di partizione restituiti da partitions() devono essere oggetti selezionabili. |
read(partition) |
Genera dati per una determinata partizione e restituisce un iteratore di tuple, righe o oggetti PyArrow RecordBatch . Ogni tupla o riga viene convertita in una riga nel dataframe finale. Questo metodo è astratto e deve essere implementato. |
Examples
Implementare un lettore di base che restituisce righe da un elenco di partizioni:
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
class MyDataSourceReader(DataSourceReader):
def partitions(self):
return [InputPartition(1), InputPartition(2), InputPartition(3)]
def read(self, partition):
yield (partition.value, 0)
yield (partition.value, 1)
Restituire righe usando PyArrow RecordBatch:
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
import pyarrow as pa
data = {
"partition": [partition.value] * 2,
"value": [0, 1]
}
table = pa.Table.from_pydict(data)
for batch in table.to_batches():
yield batch
Implementare il pushdown del filtro per supportare EqualTo i filtri:
from pyspark.sql.datasource import DataSourceReader, EqualTo
class MyDataSourceReader(DataSourceReader):
def __init__(self):
self.filters = []
def pushFilters(self, filters):
for f in filters:
if isinstance(f, EqualTo):
self.filters.append(f)
else:
yield f
def read(self, partition):
...