在目前的資料框架中,使用一個Python的原生函式映射批次迭代器,該函式在輸入與輸出皆為 pyarrow.RecordBatchs 上執行,並將結果以資料框架的形式回傳。
語法
mapInArrow(func: "ArrowMapIterFunction", schema: Union[StructType, str], barrier: bool = False, profile: Optional[ResourceProfile] = None)
參數
| 參數 | 類型 | 說明 |
|---|---|---|
func |
函式 | 一個Python的原生函式,會接受一個 pyarrow.RecordBatchs 的迭代器,並輸出一個 pyarrow.RecordBatchs 的迭代器。 |
schema |
DataType 或 str | PySpark 中 的 func 回傳類型。 該值可以是 pyspark.sql.types.DataType 物件或 DDL 格式的型別字串。 |
barrier |
bool、可選、預設 False | 使用障礙模式執行,確保該階段中的所有 Python 工作者都能同時啟動。 |
profile |
ResourceProfile,選用 | 可選的資源配置檔(ResourceProfile)用於 mapInArrow。 |
退貨
DataFrame
Examples
import pyarrow as pa
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for batch in iterator:
pdf = batch.to_pandas()
yield pa.RecordBatch.from_pandas(pdf[pdf.id == 1])
df.mapInArrow(filter_func, df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
df.mapInArrow(filter_func, df.schema, barrier=True).collect()
# [Row(id=1, age=21)]