Поделиться через


Определяемые пользователем функции pandas

Определяемая пользователем функция (UDF) pandas, также известная как векторная UDF, использует Apache Arrow для перемещения данных и pandas для работы с данными. UDF pandas поддерживают векторизованные операции, которые могут повысить производительность до 100 раз по сравнению с UDF Python, выполняющимися построчно.

Дополнительные сведения см. в записи блога New Pandas UDFs и Подсказки типов Python в предстоящем выпуске Apache Spark 3.0.

Вы можете определить UDF pandas, используя ключевое слово pandas_udf в качестве декоратора, и обернуть функцию с использованием подсказки типа Python. В этой статье описаны разные типы UDF pandas и показано, как использовать UDF pandas с подсказками типов.

UDF, которые принимают последовательность и возвращают последовательность

Вы можете использовать UDF pandas, которые принимают последовательность и возвращают последовательность, для векторизации скалярных операций. Их можно использовать с API, такими как select и withColumn.

Функция Python должна принимать серию pandas в качестве входных данных и возвращать серию pandas той же длины. Вам нужно указать это в подсказках типа Python. Spark запускает UDF pandas, разбивая столбцы на пакеты, вызывая функцию для каждого пакета как набор данных, а затем объединяя результаты.

В следующем примере показано, как создать UDF pandas, которая вычисляет произведение двух столбцов.

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+

UDF, которые принимают итератор последовательности и возвращают итератор последовательности

UDF с поддержкой итератора аналогична скалярной UDF pandas, за следующими исключениями:

  • Функция Python
    • Принимает итератор пакетов вместо одного входного пакета в качестве входных данных.
    • Возвращает итератор выходных пакетов, а не один выходной пакет.
  • Длина всех выходных данных в итераторе должна совпадать с длиной всех входных данных.
  • Обернутая UDF pandas в качестве входных данных принимается один столбец Spark.

Следует указать подсказку типа Python как Iterator[pandas.Series] ->Iterator[pandas.Series].

Эта UDF pandas полезна, когда для выполнения UDF требуется инициализация некоторого состояния, например загрузки файла модели машинного обучения для применения вывода к каждому входному пакету.

В следующем примере показано, как создать UDF pandas с поддержкой итератора.

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # initialize states
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # release resources here, if any

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+

UDF, которые принимают итератор по нескольким последовательностям и возвращают итератор последовательности

Такие UDF имеет те же характеристики и ограничения, что и UDF, которые принимают итератор последовательности и возвращают итератор последовательности. Указанная функция принимает итератор пакетов и выводит итератор пакетов. Она также полезна, если для выполнения UDF требуется инициализация некоторого состояния.

Различия описаны ниже.

  • Базовая функция Python принимает итератор кортежа последовательностей pandas.
  • Обернутая UDF pandas принимает несколько столбцов Spark в качестве входных данных.

Подсказки типов указываются как Iterator[Tuple[pandas.Series, ...]] ->Iterator[pandas.Series].

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+

UDF, которые принимают последовательность и возвращают скалярное значение

Такие UDF pandas похожи на агрегатные функции Spark. Они определяют агрегирование из одной или нескольких последовательностей pandas в скалярное значение, где каждая последовательность pandas представляет столбец Spark. Вы можете использовать такие UDF pandas с API, например select, withColumn, groupBy.agg и pyspark.sql.Window.

Подсказки типов указываются как pandas.Series, ... ->Any. Возвращаемый тип должен быть примитивным типом данных, а возвращаемое скалярное значение может быть примитивным типом Python, например int или float, либо типом данных NumPy, например numpy.int64 или numpy.float64. Тип Any в идеале должен быть определенным скалярным типом.

Этот тип UDF не поддерживает частичное агрегирование, и все данные для каждой группы загружаются в память.

В следующем примере показано, как с помощью этого типа UDF рассчитать среднее значение с использованием операций select, groupBy и window.

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+

Подробные сведения об использовании см. в разделе pyspark.sql.functions.pandas_udf.

Использование

Настройка пакета Arrow

Примечание.

Эта конфигурация не влияет на вычислительные ресурсы, настроенные в режиме общего доступа, и Databricks Runtime 13.3 LTS до 14.2.

Секции данных в Spark преобразуются в пакеты записей Arrow, что может временно вызвать интенсивное использование памяти в JVM. Чтобы избежать возможных исключений из-за нехватки памяти, вы можете настроить размер пакетов записей Arrow, задав для конфигурации spark.sql.execution.arrow.maxRecordsPerBatch целое число, определяющее максимальное количество строк для каждого пакета. Значение по умолчанию — 10 000 записей на пакет. Если количество столбцов велико, значение должно быть соответствующим образом изменено. При использовании этого ограничения каждая секция данных делится на один или несколько пакетов записей для обработки.

Метка времени с семантикой часовых поясов

Spark внутренне сохраняет метки времени в виде значений UTC, а данные меток времени, введенные без указания часового пояса, преобразуются как местное время в UTC с разрешением в микросекунды.

Когда данные метки времени экспортируются или отображаются в Spark, часовой пояс сеанса используется для локализации значений метки времени. Часовой пояс сеанса задается в конфигурации spark.sql.session.timeZone и по умолчанию соответствует локальному часовому поясу системы JVM. pandas использует тип datetime64 с разрешением в наносекунды (datetime64[ns]) с необязательным часовым поясом для каждого столбца.

Когда данные метки времени передаются из Spark в pandas, они преобразуются в наносекунды, и каждый столбец преобразуется в часовой пояс сеанса Spark, а затем локализуется в этом часовом поясе, после чего часовой пояс удаляется и значения отображаются в виде местного времени. Это происходит при вызове toPandas() или pandas_udf со столбцами меток времени.

Когда данные метки времени передаются из pandas в Spark, они преобразуются в микросекунды UTC. Это происходит при вызове createDataFrame с помощью pandas DataFrame или при возврате метки времени из UDF pandas. Эти преобразования выполняются автоматически, чтобы гарантировать наличие данных в Spark в ожидаемом формате, поэтому вам не нужно выполнять какие-либо из этих преобразований самостоятельно. Все значения в наносекундах усекаются.

Стандартная UDF загружает данные метки времени как объекты даты и времени Python, что отличается от метки времени pandas. Чтобы добиться максимальной производительности, мы рекомендуем использовать функции временных рядов pandas при работе с временными метками в UDF pandas. Дополнительные сведения см. в статье Временные ряды и функции дат.

Пример записной книжки

В следующей записной книжке показано, как оптимизировать производительность с помощью UDF pandas:

Пример записной книжки для оптимизации производительности с помощью UDF pandas

Получить записную книжку