Поделиться через


Что такое функции, определяемые пользователем (UDFs)?

Определяемые пользователем функции позволяют повторно использовать и совместно использовать код, расширяющий встроенные функции в Azure Databricks. Используйте функции, определяемые пользователем, для выполнения специфических задач, таких как сложные вычисления, преобразования или кастомизация данных.

Когда следует использовать функцию UDF и Apache Spark?

Используйте UDF (определяемые пользователем функции) для логики, которую сложно выразить с помощью встроенных функций Apache Spark. Встроенные функции Apache Spark оптимизированы для распределенной обработки и обеспечивают более высокую производительность в масштабе. См. дополнительные сведения о функциях.

Databricks рекомендует пользовательские функции для нерегламентированных запросов, ручной очистки данных, а также анализа и операций с небольшими и средними наборами данных. Обычные случаи использования UDF включают шифрование и расшифровку данных, хэширование, анализ JSON и валидацию.

Используйте методы Apache Spark для операций с очень большими наборами данных и любыми рабочими нагрузками, которые выполняются регулярно или непрерывно, включая задания ETL и операции потоковой передачи.

Общие сведения о типах UDF

Выберите тип UDF на следующих вкладках, чтобы просмотреть описание, пример и ссылку, чтобы узнать больше.

Скалярная пользовательская функция (UDF)

Скалярные UDF работают с одной строкой и возвращают однозначное результирующее значение для каждой строки. Они могут быть под управлением каталога Unity или иметь область действия сеанса.

В следующем примере используется скалярная UDF для вычисления длины каждого имени в столбце name и добавления значения в новый столбец name_length.

+-------+-------+
| name  | score |
+-------+-------+
| alice |  10.0 |
| bob   |  20.0 |
| carol |  30.0 |
| dave  |  40.0 |
| eve   |  50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);

-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name  | score | name_length |
+-------+-------+-------------+
| alice |  10.0 |      5      |
|  bob  |  20.0 |      3      |
| carol |  30.0 |      5      |
| dave  |  40.0 |      4      |
|  eve  |  50.0 |      3      |
+-------+-------+-------------+

Для реализации этого в записной книжке Databricks с помощью PySpark выполните указанные ниже действия.

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(returnType=IntegerType())
def get_name_length(name):
  return len(name)

df = df.withColumn("name_length", get_name_length(df.name))

# Show the result
display(df)

См. определяемые пользователем функции (UDF) в каталоге Unity и определяемые пользователем скалярные функции на Python.

Пакетные скалярные пользовательские функции

Обработка данных в пакетах при сохранении четности входных и выходных строк 1:1. Это снижает накладные расходы на операции с отдельными строками при обработке крупномасштабных данных. Пакетные функции, определяемые пользователем (UDF), также поддерживают сохранение состояния между пакетами, что позволяет более эффективно выполнять задачи, повторно использовать ресурсы и обрабатывать сложные вычисления, для которых требуется целостный контекст в блоках данных.

Они могут быть под управлением каталога Unity или иметь область действия сеанса.

Следующая функция UDF каталога Unity для пакетной обработки в Python вычисляет индекс массы тела (BMI) в процессе обработки пакетов строк.

+-------------+-------------+
| weight_kg   | height_m    |
+-------------+-------------+
|      90     |     1.8     |
|      77     |     1.6     |
|      50     |     1.5     |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
|  BMI   |
+--------+
|  27.8  |
|  30.1  |
|  22.2  |
+--------+

См. определяемые пользователем функции (UDF) в каталоге Unity и пакетные функции Python (UDF) в каталоге Unity.

Нескалярные определяемые пользователем функции

Нескалярные пользовательские функции работают с целыми наборами/столбцами данных с гибкими отношениями ввода-вывода (1:N или многие:многие).

Пакетные векторизованные функции pandas, строго относящиеся к сеансу, могут быть следующих типов:

  • Серия к серии
  • Итератор серии в итератор серии
  • Итератор нескольких рядов в итератор серии
  • Серия к скаляру

Пример UDF pandas для преобразования Series в Series приведен ниже.

from pyspark.sql.functions import pandas_udf
import pandas as pd

df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])

@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
  return weight / (height ** 2)

df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()

См. определяемые пользователем функции Pandas.

UDAF

UDAFs работают с несколькими строками и возвращают один агрегированный результат. UDAFs ограничены только областью сеанса.

В следующем примере UDAF суммируются оценки по длине имени.

from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd

# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
  return scores.sum()

# Group by name length and aggregate
result_df = (df.groupBy("name_length")
  .agg(total_score_udf(df["score"]).alias("total_score")))

display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
|      3      |     70.0    |
|      4      |     40.0    |
|      5      |     40.0    |
+-------------+-------------+

Обратитесь к пользовательским функциям pandas для Python и определяемым пользователем агрегатным функциям для Scala.

Определяемые пользователем функции

UDTF принимает один или несколько входных аргументов и возвращает несколько строк (и, возможно, несколько столбцов) для каждой входной строки. Они могут быть под управлением каталога Unity или иметь область действия сеанса.

Следующий UDTF создает таблицу с помощью фиксированного списка двух целых аргументов:

CREATE OR REPLACE FUNCTION get_sum_diff(x INT, y INT)
RETURNS TABLE (sum INT, diff INT)
LANGUAGE PYTHON
HANDLER 'GetSumDiff'
AS $$
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y
$$;

SELECT * FROM get_sum_diff(10, 3);
+-----+------+
| sum | diff |
+-----+------+
| 13  | 7    |
+-----+------+

Для реализации этого в записной книжке Databricks с помощью PySpark выполните указанные ниже действия.

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

См. UDTFs каталога Unity и определяемые сеансом определяемые пользователем функции.

Управляемые Каталогом Unity и относящиеся к области сеанса UDF (пользовательские функции)

Определяемые пользователем UDFS каталога Unity каталога Unity и каталог Unity каталога Unity сохраняются в каталоге Unity для улучшения управления, повторного использования и обнаружения. Все остальные определяемые пользователем функции сеансовые, что означает, что они определены в записной книжке или задании и ограничиваются текущим SparkSession. Вы можете определить и получить доступ к определяемой пользователем области сеанса с помощью Scala или Python.

Каталог Unity: памятка по управляемым пользовательским функциям (UDF)

Каталог Unity с управляемыми UDF (определяемыми пользователем функциями) позволяет определять, использовать, безопасно делиться и управлять этими функциями в вычислительных средах. См. функции, определяемые пользователем (UDF), в каталоге Unity.

Тип UDF Поддерживаемые вычислительные ресурсы Описание
Python UDF каталога Unity
  • Бессерверные записные книжки и задания
  • Классические вычисления с стандартным режимом доступа (Databricks Runtime 13.3 LTS и более поздней версии)
  • Хранилище SQL (бессерверный и профессиональный)
  • Декларативные конвейеры Spark Lakeflow (классические и бессерверные)
Определите UDF в Python и зарегистрируйте его в каталоге Unity для управления.
Скалярные UDF работают с одной строкой и возвращают однозначное результирующее значение для каждой строки.
UDF пакетного каталога Unity
  • Бессерверные записные книжки и задания
  • Классические вычисления с стандартным режимом доступа (Databricks Runtime 16.3 и более поздних версий)
  • Хранилище SQL (бессерверный и профессиональный)
Определите UDF в Python и зарегистрируйте его в каталоге Unity для управления.
Пакетные операции с несколькими значениями и возврат нескольких значений. Снижает издержки на построчные операции для обработки крупномасштабных данных.
UDTF каталога Unity
  • Бессерверные записные книжки и задания
  • Классические вычисления с стандартным режимом доступа (Databricks Runtime 17.1 и более поздних версий)
  • Хранилище SQL (бессерверный и профессиональный)
Определите UDTF в Python и зарегистрируйте его в каталоге Unity для управления.
UDTF принимает один или несколько входных аргументов и возвращает несколько строк (и, возможно, несколько столбцов) для каждой входной строки.

Памятка по определяемой пользователем области сеанса для изолированных пользователем вычислений

Определяемые пользователем функции, привязанные к сеансу, определяются в записной книжке или задании и относятся к текущему SparkSession. Вы можете определить и получить доступ к определяемой пользователем области сеанса с помощью Scala или Python.

Тип UDF Поддерживаемые вычислительные ресурсы Описание
Python скаляр
  • Бессерверные записные книжки и задания
  • Классические вычисления с стандартным режимом доступа (Databricks Runtime 13.3 LTS и более поздней версии)
  • Декларативные конвейеры Spark Lakeflow (классические и бессерверные)
Скалярные UDF работают с одной строкой и возвращают однозначное результирующее значение для каждой строки.
Python нескалярный
  • Бессерверные записные книжки и задания
  • Классические вычисления с стандартным режимом доступа (Databricks Runtime 14.3 LTS и выше)
  • Декларативные конвейеры Spark Lakeflow (классические и бессерверные)
Нескалярные определяемые пользователем функции включают pandas_udf, mapInPandas, mapInArrow, applyInPandas. Пользовательские функции (UDFs) Pandas используют Apache Arrow для передачи данных и Pandas для работы с данными. Пользовательские функции Pandas (UDF) поддерживают векторные операции, которые могут значительно улучшить производительность по сравнению со скалярными пользовательскими функциями, обрабатывающими строки.
Определяемые пользователем функции Python
  • Бессерверные записные книжки и задания
  • Классические вычисления с стандартным режимом доступа (Databricks Runtime 14.3 LTS и выше)
  • Декларативные конвейеры Spark Lakeflow (классические и бессерверные)
UDTF принимает один или несколько входных аргументов и возвращает несколько строк (и, возможно, несколько столбцов) для каждой входной строки.
Скалярные пользовательские функции Scala
  • Классические вычисления с стандартным режимом доступа (Databricks Runtime 13.3 LTS и более поздней версии)
Скалярные UDF работают с одной строкой и возвращают однозначное результирующее значение для каждой строки.
Scala UDAFs
  • Классические вычисления с выделенным режимом доступа (Databricks Runtime 14.2 LTS и выше)
UDAFs работают с несколькими строками и возвращают один агрегированный результат.

Рекомендации по производительности

  • Встроенные функции и пользовательские функции SQL являются наиболее эффективными вариантами.

  • Масштабируемые определяемые пользователем функции обычно быстрее, чем определяемые пользователем Функции Python.

    • Несоизолированные UDF функции Scala выполняются в виртуальной машине Java (JVM), поэтому они позволяют избежать затрат на перемещение данных в JVM и из него.
    • Изолированные пользовательские функции Scala должны перемещать данные в и из JVM, но они по-прежнему могут быть быстрее, чем функции Python UDF, так как они более эффективно обрабатывают память.
  • Определяемые пользователем функции Python и функции pandas обычно работают медленнее, чем функции Scala, потому что им нужно сериализовать данные и перемещать их из JVM в интерпретатор Python.

    • Пользовательские функции Pandas в 100 раз быстрее, чем функции Python, так как они используют Apache Arrow для снижения затрат на сериализацию.