API-интерфейсы функций Pandas
API-интерфейсы функций pandas позволяют напрямую применять собственную функцию Python, которая принимает и выводит экземпляры Pandas в кадр данных PySpark. Как и в случае с пользовательскими функциями Pandas, API-интерфейсы функций также используют Apache Arrow для передачи данных и pandas для работы с данными; Однако указания типов Python являются необязательными в API-интерфейсах функций Pandas.
Существует три типа API-интерфейсов функций Pandas:
- Сгруппированная карта
- Карта
- Согруппированная карта
API-интерфейсы функций pandas используют ту же внутреннюю логику, что и при выполнении определяемой пользователем функции Pandas. Они имеют общие характеристики, такие как PyArrow, поддерживаемые типы SQL и конфигурации.
Дополнительные сведения см. в записи блога New Pandas UDFs and Python Type Hints в предстоящем выпуске Apache Spark 3.0.
Сгруппированная карта
Вы преобразуете сгруппированные данные с помощью groupBy().applyInPandas()
для реализации шаблона "разделение-применение-объединение". Разделение, применение и объединение состоит из трех этапов:
- Разделите данные на группы с помощью
DataFrame.groupBy
. - Примените функцию к каждой группе. Входные и выходные данные функции являются
pandas.DataFrame
. Входные данные содержат все строки и столбцы для каждой группы. - Объедините результаты в новый
DataFrame
.
Чтобы использовать groupBy().applyInPandas()
, необходимо определить следующее:
- Функция Python, которая определяет вычисления для каждой группы.
- Объект
StructType
или строка, определяющие схему выходных данных.DataFrame
Метки столбцов возвращаемого pandas.DataFrame
объекта должны соответствовать именам полей в определенной выходной схеме, если они указаны в виде строк, или соответствовать типам данных полей по позиции, если не строки, например целочисленные индексы. См. pandas. Кадр данных о том, как помечать столбцы при создании pandas.DataFrame
.
Все данные для группы загружаются в память перед применением функции. Это может привести к исключению нехватки памяти, особенно в случае неравномерного распределения размеров групп. Конфигурация maxRecordsPerBatch не применяется к группам, и вы можете убедиться, что сгруппированные данные помещаются в доступную память.
В следующем примере показано, как использовать groupby().apply()
для вычитания среднего значения из каждого значения в группе.
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id| v|
# +---+----+
# | 1|-0.5|
# | 1| 0.5|
# | 2|-3.0|
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
Подробные сведения об использовании см. в разделе pyspark.sql.GroupedData.applyInPandas.
Карта
Операции сопоставления с экземплярами Pandas выполняются для DataFrame.mapInPandas()
преобразования итератора pandas.DataFrame
в другой итератор pandas.DataFrame
, представляющий текущий кадр данных PySpark, и возвращает результат в виде кадра данных PySpark.
Базовая функция принимает и выводит итератор pandas.DataFrame
. Он может возвращать выходные данные произвольной длины в отличие от некоторых определяемых пользователем функций Pandas, таких как Series, в series.
В следующем примере показано, как использовать mapInPandas()
:
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# | 1| 21|
# +---+---+
Подробные сведения об использовании см. в разделе pyspark.sql.DataFrame.mapInPandas.
Согруппированная карта
Для совместно сгруппированных операций сопоставления с экземплярами Pandas используйте DataFrame.groupby().cogroup().applyInPandas()
для совместной группы два pySpark DataFrame
по общему ключу, а затем примените функцию Python к каждой совместной группе, как показано ниже:
- Перемешивайте данные таким образом, чтобы группы каждого кадра данных, которые совместно используют ключ, были совместно группированы.
- Примените функцию к каждой совместной группе. Входные данные функции — два
pandas.DataFrame
(с необязательным кортежем, представляющим ключ). Выходные данные функции —pandas.DataFrame
. - Объедините элементы
pandas.DataFrame
из всех групп в новый PySparkDataFrame
.
Чтобы использовать groupBy().cogroup().applyInPandas()
, необходимо определить следующее:
- Функция Python, которая определяет вычисления для каждой совместной группы.
- Объект
StructType
или строка, определяющая схему выходных данных PySparkDataFrame
.
Метки столбцов возвращаемого pandas.DataFrame
объекта должны соответствовать именам полей в определенной выходной схеме, если они указаны в виде строк, или соответствовать типам данных полей по позиции, если не строки, например целочисленные индексы. См. pandas. Кадр данных о том, как помечать столбцы при создании pandas.DataFrame
.
Все данные для совместной группы загружаются в память перед применением функции. Это может привести к исключению нехватки памяти, особенно в случае неравномерного распределения размеров групп. Конфигурация для maxRecordsPerBatch не применяется, и вы можете убедиться, что совместно сгруппированные данные помещаются в доступную память.
В следующем примере показано, как использовать groupby().cogroup().applyInPandas()
для выполнения между asof join
двумя наборами данных.
import pandas as pd
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
Подробные сведения об использовании см. в разделе pyspark.sql.PandasCogroupedOps.applyInPandas.