一個用於資料來源讀取器的基底類別。
資料來源讀取器負責從資料來源輸出資料。 實作這個類別並回傳一個 DataSource.reader() 實例,讓資料來源變得可讀。
語法
from pyspark.sql.datasource import DataSourceReader
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
...
方法
| 方法 | 說明 |
|---|---|
pushFilters(filters) |
會用可以推送到資料來源的篩選器清單來呼叫。 回傳一個仍需 Spark 評估的篩選器迭代。 預設情況下,會回傳所有過濾器,表示沒有被推下任何過濾器。
pushFilters() 允許修改 self。 該物件在修改後仍需保持可挑剔狀態。 變化 在 self 和 上可見partitions()read()。 |
partitions() |
回傳一串 InputPartition 將資料讀取拆分成平行任務的物件序列。 預設情況下,會回傳單一分割區。 覆蓋以提升讀取大型資料集時的效能。 所有由 回 partitions() 傳的分割值必須是可選取的物件。 |
read(partition) |
為給定分割產生資料,並回傳元組、列或 PyArrow RecordBatch 物件的迭代器。 每個元組或列都會在最終資料框架中轉換成一列。 此方法為抽象且必須實作。 |
Examples
實作一個基本讀取器,能從分割區清單中回傳資料列:
from pyspark.sql.datasource import DataSource, DataSourceReader, InputPartition
class MyDataSourceReader(DataSourceReader):
def partitions(self):
return [InputPartition(1), InputPartition(2), InputPartition(3)]
def read(self, partition):
yield (partition.value, 0)
yield (partition.value, 1)
使用 PyArrow RecordBatch返回資料列:
class MyDataSourceReader(DataSourceReader):
def read(self, partition):
import pyarrow as pa
data = {
"partition": [partition.value] * 2,
"value": [0, 1]
}
table = pa.Table.from_pydict(data)
for batch in table.to_batches():
yield batch
實作濾波器推下以支援 EqualTo 濾波器:
from pyspark.sql.datasource import DataSourceReader, EqualTo
class MyDataSourceReader(DataSourceReader):
def __init__(self):
self.filters = []
def pushFilters(self, filters):
for f in filters:
if isinstance(f, EqualTo):
self.filters.append(f)
else:
yield f
def read(self, partition):
...