Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Eine Basisklasse für Datenquellenleser.
Datenquellenleser sind für die Ausgabe von Daten aus einer Datenquelle verantwortlich. Implementieren Sie diese Klasse, und geben Sie eine Instanz zurück DataSource.reader() , um eine Datenquelle lesbar zu machen.
Syntax
from pyspark.sql.datasource import DataSourceReader
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
...
Methodik
| Methode | Beschreibung |
|---|---|
pushFilters(filters) |
Wird mit der Liste der Filter aufgerufen, die an die Datenquelle übertragen werden können. Gibt eine Iterable von Filtern zurück, die noch von Spark ausgewertet werden müssen. Gibt standardmäßig alle Filter zurück, die angeben, dass keine Filter nach unten verschoben werden.
pushFilters() kann geändert werden self. Das Objekt muss nach änderungsfähig bleiben. Änderungen, an denen self sichtbar partitions() sind, und read(). |
partitions() |
Gibt eine Abfolge von Objekten zurück, die das Lesen von InputPartition Daten in parallele Vorgänge aufteilen. Gibt standardmäßig eine einzelne Partition zurück. Überschreiben Sie beim Lesen großer Datasets eine bessere Leistung. Alle von ihnen zurückgegebenen partitions() Partitionswerte müssen auswählbare Objekte sein. |
read(partition) |
Generiert Daten für eine bestimmte Partition und gibt einen Iterator von Tupeln, Zeilen oder PyArrow-Objekten RecordBatch zurück. Jedes Tupel oder jede Zeile wird in eine Zeile im endgültigen DataFrame konvertiert. Diese Methode ist abstrakt und muss implementiert werden. |
Beispiele
Implementieren Sie einen einfachen Reader, der Zeilen aus einer Liste von Partitionen zurückgibt:
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
class MyDataSourceReader(DataSourceReader):
def partitions(self):
return [InputPartition(1), InputPartition(2), InputPartition(3)]
def read(self, partition):
yield (partition.value, 0)
yield (partition.value, 1)
Zurückgeben von Zeilen mithilfe von PyArrow RecordBatch:
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
import pyarrow as pa
data = {
"partition": [partition.value] * 2,
"value": [0, 1]
}
table = pa.Table.from_pydict(data)
for batch in table.to_batches():
yield batch
Implementieren Sie Filter-Pushdown, um Filter zu unterstützen EqualTo :
from pyspark.sql.datasource import DataSourceReader, EqualTo
class MyDataSourceReader(DataSourceReader):
def __init__(self):
self.filters = []
def pushFilters(self, filters):
for f in filters:
if isinstance(f, EqualTo):
self.filters.append(f)
else:
yield f
def read(self, partition):
...