事件
3月31日 下午11時 - 4月2日 下午11時
最終Microsoft Fabric、Power BI、SQL 和 AI 社群主導的活動。 2025 年 3 月 31 日至 4 月 2 日。
立即註冊pandas 函式 API 可讓您直接套用 Python 原生函式,以將 pandas 實例採用並輸出至 PySpark DataFrame。 與 pandas 使用者定義函式類似,函式 API 也會使用 Apache Arrow 來傳輸資料和 pandas 來處理資料;不過,Pandas 函式 API 中的 Python 類型提示是選擇性的。
Pandas 函式 API 有三種類型:
pandas 函式 API 會利用 pandas UDF 執行所使用的相同內部邏輯。 它們會共用 PyArrow、支援的 SQL 類型和組態等特性。
如需詳細資訊,請參閱 即將推出的 Apache Spark 3.0 版本中的部落格文章 New Pandas UDF 和 Python 類型提示。
您可以使用 來轉換群組資料 groupBy().applyInPandas()
,以實作「split-apply-combine」模式。 Split-apply-combine 包含三個步驟:
DataFrame.groupBy
將資料分割成群組。pandas.DataFrame
。 輸入資料包含每個群組的所有資料列和資料行。DataFrame
。若要使用 groupBy().applyInPandas()
,您必須定義下列專案:
StructType
或定義輸出架構的字串 DataFrame
傳回 pandas.DataFrame
之 的資料行標籤必須符合所定義輸出架構中的功能變數名稱,如果指定為字串,則比對欄位資料類型,例如整數索引。 請參閱 pandas。DataFrame 說明如何在建構 時標記資料行 pandas.DataFrame
。
套用函式之前,群組的所有資料都會載入記憶體中。 這可能會導致記憶體不足的例外狀況,特別是在群組大小扭曲時。 maxRecordsPerBatch的組態不會套用在群組上,因此您必須確定已分組的資料符合可用的記憶體。
下列範例示範如何使用 groupby().apply()
來減去群組中每個值的平均值。
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
如需詳細使用量,請參閱 pyspark.sql.GroupedData.applyInPandas。
您可以使用 pandas 實例執行對應作業, DataFrame.mapInPandas()
以便將 的 pandas.DataFrame
反覆運算器轉換成另一個反覆運算器 pandas.DataFrame
,代表目前的 PySpark DataFrame,並以 PySpark DataFrame 傳回結果。
基礎函式會接受 並輸出 的 pandas.DataFrame
反覆運算器。 它可以傳回任意長度的輸出,與某些 Pandas UDF 相較之下,例如數列到數列。
下列範例示範如何使用 mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
如需詳細的使用方式,請參閱 pyspark.sql.DataFrame.mapInPandas。
針對具有 pandas 實例的共同群組對應作業,請使用 DataFrame.groupby().cogroup().applyInPandas()
將兩個 PySpark DataFrame
共同群組在一起,然後將 Python 函式套用至每個共同群組,如下所示:
pandas.DataFrame
個 (,選擇性 Tuple 代表索引鍵) 。 函式的輸出是 pandas.DataFrame
。pandas.DataFrame
將所有群組中的 合併為新的 PySpark DataFrame
。若要使用 groupBy().cogroup().applyInPandas()
,您必須定義下列專案:
StructType
物件或字串,定義輸出 PySpark DataFrame
的架構。傳回 pandas.DataFrame
之 的資料行標籤必須符合所定義輸出架構中的功能變數名稱,如果指定為字串,則比對欄位資料類型,例如整數索引。 請參閱 pandas。DataFrame 說明如何在建構 時標記資料行 pandas.DataFrame
。
套用函式之前,會將共同群組的所有資料載入記憶體中。 這可能會導致記憶體不足的例外狀況,特別是在群組大小扭曲時。 未套用 maxRecordsPerBatch 的組態,而且您必須確定已群組的資料符合可用的記憶體。
下列範例示範如何使用 groupby().cogroup().applyInPandas()
在兩個 asof join
資料集之間執行 。
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
如需詳細使用量,請參閱 pyspark.sql.PandasCogroupedOps.applyInPandas。
事件
3月31日 下午11時 - 4月2日 下午11時
最終Microsoft Fabric、Power BI、SQL 和 AI 社群主導的活動。 2025 年 3 月 31 日至 4 月 2 日。
立即註冊訓練
模組
在 Microsoft Fabric 中使用 Apache Spark - Training
Apache Spark 是大規模資料分析的核心技術。 Microsoft Fabric 為 Spark 叢集提供了支援,可讓您大規模分析及處理資料。
文件
Spark 上的 Pandas API - Azure Databricks
了解如何在 Spark 上使用 Pandas API 來存取 Azure Databricks 中的資料。
您可以在 Azure Databricks 上使用 Pandas 嗎? - Azure Databricks
探索在 Azure Databricks 上使用 pandas 的選項。 使用 DataFrames、轉換為 PySpark,並使用 Arrow 套用函式。
在 PySpark 與 pandas DataFrame 之間轉換 - Azure Databricks
瞭解如何使用 Azure Databricks 中的 Apache 箭號,在 Pandas DataFrame 中來回轉換 Apache Spark DataFrame。