Поделиться через


Определяемые пользователем функции pandas

Определяемая пользователем функция (UDF) pandas, также известная как векторная UDF, использует Apache Arrow для перемещения данных и pandas для работы с данными. UDF pandas поддерживают векторизованные операции, которые могут повысить производительность до 100 раз по сравнению с UDF Python, выполняющимися построчно.

Дополнительные сведения см. в записи блога New Pandas UDFs и Подсказки типов Python в предстоящем выпуске Apache Spark 3.0.

Вы можете определить UDF pandas, используя ключевое слово pandas_udf в качестве декоратора, и обернуть функцию с использованием подсказки типа Python. В этой статье описаны разные типы UDF pandas и показано, как использовать UDF pandas с подсказками типов.

UDF, которые принимают последовательность и возвращают последовательность

Вы можете использовать UDF pandas, которые принимают последовательность и возвращают последовательность, для векторизации скалярных операций. Их можно использовать с API, такими как select и withColumn.

Функция Python должна принимать серию pandas в качестве входных данных и возвращать серию pandas той же длины. Вам нужно указать это в подсказках типа Python. Spark запускает UDF pandas путем разделения столбцов на пакеты, вызывая функцию для каждого пакета в качестве подмножества данных, а затем объединяя результаты.

В следующем примере показано, как создать UDF pandas, вычисляющий продукт из 2 столбцов.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

UDF, которые принимают итератор последовательности и возвращают итератор последовательности

UDF с поддержкой итератора аналогична скалярной UDF pandas, за следующими исключениями:

  • Функция Python
    • Принимает итератор пакетов вместо одного входного пакета в качестве входных данных.
    • Возвращает итератор выходных пакетов, а не один выходной пакет.
  • Длина всех выходных данных в итераторе должна совпадать с длиной всех входных данных.
  • UDF в оболочке pandas принимает один столбец Spark в качестве входных данных.

Следует указать подсказку типа Python как Iterator[pandas.Series] ->Iterator[pandas.Series].

Эта UDF pandas полезна, когда для выполнения UDF требуется инициализация некоторого состояния, например загрузки файла модели машинного обучения для применения вывода к каждому входному пакету.

В следующем примере показано, как создать UDF pandas с поддержкой итератора.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

UDF, которые принимают итератор по нескольким последовательностям и возвращают итератор последовательности

Такие UDF имеет те же характеристики и ограничения, что и UDF, которые принимают итератор последовательности и возвращают итератор последовательности. Указанная функция принимает итератор пакетов и выводит итератор пакетов. Она также полезна, если для выполнения UDF требуется инициализация некоторого состояния.

Различия описаны ниже.

  • Базовая функция Python принимает итератор кортежа последовательностей pandas.
  • UDF оболочки pandas принимает несколько столбцов Spark в качестве входных данных.

Подсказки типов указываются как Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

UDF, которые принимают последовательность и возвращают скалярное значение

Такие UDF pandas похожи на агрегатные функции Spark. Серия для скалярных панд UDF определяет агрегирование из одной или нескольких серий pandas в скалярное значение, где каждая серия pandas представляет столбец Spark. Вы используете pandas UDF для преобразования Series в скаляр с такими API, как select, withColumn, groupBy.aggи в pyspark.sql.Window.

Подсказки типов указываются как pandas.Series, ... ->Any. Возвращаемый тип должен быть примитивным типом данных, а возвращаемое скалярное значение может быть примитивным типом Python, например int или float, либо типом данных NumPy, например numpy.int64 или numpy.float64. Тип Any в идеале должен быть определенным скалярным типом.

Этот тип UDF не поддерживает частичное агрегирование, и все данные для каждой группы загружаются в память.

В следующем примере показано, как с помощью этого типа UDF рассчитать среднее значение с использованием операций select, groupBy и window.

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Подробные сведения об использовании см. в разделе pyspark.sql.functions.pandas_udf.

Использование

Настройка пакета Arrow

Примечание.

Эта конфигурация не влияет на вычислительные ресурсы, настроенные в режиме общего доступа, и Databricks Runtime 13.3 LTS до 14.2.

Секции данных в Spark преобразуются в пакеты записей Arrow, что может временно вызвать интенсивное использование памяти в JVM. Чтобы избежать возможных исключений из-за нехватки памяти, вы можете настроить размер пакетов записей Arrow, задав для конфигурации spark.sql.execution.arrow.maxRecordsPerBatch целое число, определяющее максимальное количество строк для каждого пакета. Значение по умолчанию — 10 000 записей на пакет. Если число столбцов большое, значение должно быть скорректировано соответствующим образом. С помощью этого ограничения каждая секция данных делится на 1 или более пакетов записей для обработки.

Метка времени с семантикой часовых поясов

Spark внутренне хранит метки времени в виде значений UTC, и данные меток времени, поступающие без указанного часового пояса, преобразуются из местного времени в UTC с точностью до микросекунды.

Если данные метки времени экспортируются или отображаются в Spark, часовой пояс сеанса используется для локализации значений метки времени. Часовой пояс сеанса устанавливается с конфигурацией spark.sql.session.timeZone и по умолчанию используется локальный часовой пояс системы JVM. Pandas использует тип datetime64 с разрешением наносекунд, datetime64[ns], и с необязательным часовым поясом для каждого столбца.

Когда данные о метках времени передаются из Spark в pandas, они преобразуются в наносекунды. Затем каждый столбец преобразуется в часовой пояс текущей сессии Spark, после чего значения локализуются, удаляя информацию о часовом поясе и отображая их в виде локального времени. Это происходит при вызове toPandas() или pandas_udf со столбцами метки времени.

Когда данные метки времени передаются из pandas в Spark, они преобразуются в микросекунды UTC. Это происходит при вызове createDataFrame с помощью pandas DataFrame или при возврате метки времени из UDF pandas. Эти преобразования выполняются автоматически, чтобы гарантировать наличие данных в Spark в ожидаемом формате, поэтому вам не нужно выполнять какие-либо из этих преобразований самостоятельно. Все значения наносекунд обрезаны.

Стандартная UDF загружает данные метки времени как объекты даты и времени Python, что отличается от метки времени pandas. Чтобы получить лучшую производительность, рекомендуется использовать функциональность временных рядов pandas при работе с временными метками в pandas UDF. Дополнительные сведения см. в статье Временные ряды и функции дат.

Пример записной книжки

В следующей записной книжке показано, как оптимизировать производительность с помощью UDF pandas:

Пример записной книжки для оптимизации производительности с помощью UDF pandas

Получение записной книжки