Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Kelas dasar untuk pembaca sumber data.
Pembaca sumber data bertanggung jawab untuk menghasilkan data dari sumber data. Terapkan kelas ini dan kembalikan instans dari DataSource.reader() untuk membuat sumber data dapat dibaca.
Sintaksis
from pyspark.sql.datasource import DataSourceReader
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
...
Metode
| Metode | Deskripsi |
|---|---|
pushFilters(filters) |
Dipanggil dengan daftar filter yang dapat didorong ke sumber data. Mengembalikan filter yang dapat diulang yang masih perlu dievaluasi oleh Spark. Secara default, mengembalikan semua filter, menunjukkan tidak ada filter yang didorong ke bawah.
pushFilters() diizinkan untuk memodifikasi self. Objek harus tetap dapat dipilih setelah modifikasi.
self Perubahan terlihat ke partitions() dan read(). |
partitions() |
Mengembalikan urutan objek yang membagi pembacaan InputPartition data menjadi tugas paralel. Secara default, mengembalikan satu partisi. Ambil alih untuk performa yang lebih baik saat membaca himpunan data besar. Semua nilai partisi yang dikembalikan oleh partitions() harus berupa objek yang dapat dipilih. |
read(partition) |
Menghasilkan data untuk partisi tertentu dan mengembalikan iterator objek tuple, baris, atau PyArrow RecordBatch . Setiap tuple atau baris dikonversi menjadi baris di DataFrame akhir. Metode ini abstrak dan harus diimplementasikan. |
Examples
Terapkan pembaca dasar yang mengembalikan baris dari daftar partisi:
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
class MyDataSourceReader(DataSourceReader):
def partitions(self):
return [InputPartition(1), InputPartition(2), InputPartition(3)]
def read(self, partition):
yield (partition.value, 0)
yield (partition.value, 1)
Mengembalikan baris menggunakan PyArrow RecordBatch:
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
import pyarrow as pa
data = {
"partition": [partition.value] * 2,
"value": [0, 1]
}
table = pa.Table.from_pydict(data)
for batch in table.to_batches():
yield batch
Terapkan pushdown filter untuk mendukung EqualTo filter:
from pyspark.sql.datasource import DataSourceReader, EqualTo
class MyDataSourceReader(DataSourceReader):
def __init__(self):
self.filters = []
def pushFilters(self, filters):
for f in filters:
if isinstance(f, EqualTo):
self.filters.append(f)
else:
yield f
def read(self, partition):
...