Поделиться через


Что такое функции, определяемые пользователем (UDFs)?

Определяемые пользователем функции позволяют повторно использовать и совместно использовать код, расширяющий встроенные функции в Azure Databricks. Используйте функции, определяемые пользователем, для выполнения специфических задач, таких как сложные вычисления, преобразования или кастомизация данных.

Когда следует использовать функцию UDF и Apache Spark?

Используйте UDF (определяемые пользователем функции) для логики, которую сложно выразить с помощью встроенных функций Apache Spark. Встроенные функции Apache Spark оптимизированы для распределенной обработки и обеспечивают более высокую производительность в масштабе. См. дополнительные сведения о функциях.

Databricks рекомендует пользовательские функции для нерегламентированных запросов, ручной очистки данных, а также анализа и операций с небольшими и средними наборами данных. Обычные случаи использования UDF включают шифрование и расшифровку данных, хэширование, анализ JSON и валидацию.

Используйте методы Apache Spark для операций с очень большими наборами данных и любых рабочих нагрузок, выполняемых регулярно или непрерывно, включая задания ETL и операции потоковой передачи.

Общие сведения о типах UDF

Выберите тип UDF на следующих вкладках, чтобы просмотреть описание, пример и ссылку, чтобы узнать больше.

Скалярная пользовательская функция (UDF)

Скалярные UDF работают с одной строкой и возвращают однозначное результирующее значение для каждой строки. Они могут быть под управлением каталога Unity или иметь область действия сеанса.

В следующем примере используется скалярная UDF для вычисления длины каждого имени в столбце name и добавления значения в новый столбец name_length.

+-------+-------+
| name  | score |
+-------+-------+
| alice |  10.0 |
| bob   |  20.0 |
| carol |  30.0 |
| dave  |  40.0 |
| eve   |  50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);

-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name  | score | name_length |
+-------+-------+-------------+
| alice |  10.0 |      5      |
|  bob  |  20.0 |      3      |
| carol |  30.0 |      5      |
| dave  |  40.0 |      4      |
|  eve  |  50.0 |      3      |
+-------+-------+-------------+

Для реализации этого в записной книжке Databricks с помощью PySpark выполните указанные ниже действия.

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(returnType=IntegerType())
def get_name_length(name):
  return len(name)

df = df.withColumn("name_length", get_name_length(df.name))

# Show the result
display(df)

См. определяемые пользователем функции (UDF) в каталоге Unity и определяемые пользователем скалярные функции на Python.

Пакетные скалярные пользовательские функции

Обработка данных в пакетах при сохранении четности входных и выходных строк 1:1. Это снижает накладные расходы на операции с отдельными строками при обработке крупномасштабных данных. Пакетные функции, определяемые пользователем (UDF), также поддерживают сохранение состояния между пакетами, что позволяет более эффективно выполнять задачи, повторно использовать ресурсы и обрабатывать сложные вычисления, для которых требуется целостный контекст в блоках данных.

Они могут быть под управлением каталога Unity или иметь область действия сеанса.

Следующая функция UDF каталога Unity для пакетной обработки в Python вычисляет индекс массы тела (BMI) в процессе обработки пакетов строк.

+-------------+-------------+
| weight_kg   | height_m    |
+-------------+-------------+
|      90     |     1.8     |
|      77     |     1.6     |
|      50     |     1.5     |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
|  BMI   |
+--------+
|  27.8  |
|  30.1  |
|  22.2  |
+--------+

См. определяемые пользователем функции (UDF) в каталоге Unity и пакетные функции Python (UDF) в каталоге Unity.

Нескалярные определяемые пользователем функции

Нескалярные пользовательские функции работают с целыми наборами/столбцами данных с гибкими отношениями ввода-вывода (1:N или многие:многие).

Пакетные векторизованные функции pandas, строго относящиеся к сеансу, могут быть следующих типов:

  • Серия к серии
  • Итератор серии в итератор серии
  • Итератор нескольких рядов в итератор серии
  • Серия к скаляру

Пример UDF pandas для преобразования Series в Series приведен ниже.

from pyspark.sql.functions import pandas_udf
import pandas as pd

df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])

@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
  return weight / (height ** 2)

df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()

См. определяемые пользователем функции Pandas.

UDAF

UDAFs работают с несколькими строками и возвращают один агрегированный результат. UDAFs ограничены только областью сеанса.

В следующем примере UDAF суммируются оценки по длине имени.

from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd

# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
  return scores.sum()

# Group by name length and aggregate
result_df = (df.groupBy("name_length")
  .agg(total_score_udf(df["score"]).alias("total_score")))

display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
|      3      |     70.0    |
|      4      |     40.0    |
|      5      |     40.0    |
+-------------+-------------+

Обратитесь к пользовательским функциям pandas для Python и определяемым пользователем агрегатным функциям для Scala.

Определяемые пользователем функции

UDTF принимает один или несколько входных аргументов и возвращает несколько строк (и, возможно, несколько столбцов) для каждой входной строки. Пользовательские табличные функции (UDTFs) ограничены только областью сеанса.

В следующем примере каждое значение в столбце оценки соответствует списку категорий. UDTF разбивает разделенный запятыми список на несколько строк.

+-------+-------+-----------------+
| name  | score |   categories    |
+-------+-------+-----------------+
| alice |  10.0 |  math,science   |
|  bob  |  20.0 |  history,math   |
| carol |  30.0 | science,history |
| dave  |  40.0 |    math,art     |
|  eve  |  50.0 |  science,art    |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf

@udtf(returnType="name: string, score: int, category: string")
class ScoreCategoriesUDTF:
  def eval(self, name: str, score: int, categories: str):
    category_list = categories.split(',')
    for category in category_list:
      yield (name, score, category)

ScoreCategoriesUDTF(lit("Alice"), lit(85), lit("Math,Science,English")).display()
+-------+-------+----------+
| name  | score | category |
+-------+-------+----------+
| alice |  10.0 |   math   |
| alice |  10.0 | science  |
|  bob  |  20.0 | history  |
|  bob  |  20.0 |   math   |
| carol |  30.0 | science  |
| carol |  30.0 | history  |
| dave  |  40.0 |   math   |
| dave  |  40.0 |   art    |
|  eve  |  50.0 | science  |
|  eve  |  50.0 |   art    |
+-------+-------+----------+

См. определяемые пользователем табличные функции Python (UDTFs).

Управляемые Каталогом Unity и относящиеся к области сеанса UDF (пользовательские функции)

Определяемые пользователем функции (UDF) каталога Unity и пакетные функции (Batch UDF) каталога Unity сохраняются в каталоге Unity для улучшения управляемости, повторного использования и возможности обнаружения. Все остальные определяемые пользователем функции сеансовые, что означает, что они определены в записной книжке или задании и ограничиваются текущим SparkSession. Вы можете определить и получить доступ к определяемой пользователем области сеанса с помощью Scala или Python.

Каталог Unity: памятка по управляемым пользовательским функциям (UDF)

Каталог Unity с управляемыми UDF (определяемыми пользователем функциями) позволяет определять, использовать, безопасно делиться и управлять этими функциями в вычислительных средах. См. функции, определяемые пользователем (UDF), в каталоге Unity.

Тип UDF Поддерживаемые вычислительные ресурсы Описание
Python UDF каталога Unity
  • Бессерверные записные книжки и задания
  • Классические вычисления с стандартным режимом доступа (Databricks Runtime 13.3 LTS и более поздней версии)
  • Хранилища SQL (бессерверные, профессиональные и классические)
  • Декларативные конвейеры Lakeflow (классические и бессерверные)
Определите UDF в Python и зарегистрируйте его в каталоге Unity для управления.
Скалярные UDF работают с одной строкой и возвращают однозначное результирующее значение для каждой строки.
UDF пакетного каталога Unity
  • Бессерверные записные книжки и задания
  • Классические вычисления с стандартным режимом доступа (Databricks Runtime 16.3 и более поздних версий)
  • Хранилище SQL (бессерверный, профессиональный и классический)
Определите UDF в Python и зарегистрируйте его в каталоге Unity для управления.
Пакетные операции с несколькими значениями и возврат нескольких значений. Снижает издержки на построчные операции для обработки крупномасштабных данных.

Памятка по определяемой пользователем области сеанса для изолированных пользователем вычислений

Определяемые пользователем функции, привязанные к сеансу, определяются в записной книжке или задании и относятся к текущему SparkSession. Вы можете определить и получить доступ к определяемой пользователем области сеанса с помощью Scala или Python.

Тип UDF Поддерживаемые вычислительные ресурсы Описание
Python скаляр
  • Бессерверные записные книжки и задания
  • Классические вычисления с стандартным режимом доступа (Databricks Runtime 13.3 LTS и более поздней версии)
  • Декларативные конвейеры Lakeflow (классические и бессерверные)
Скалярные UDF работают с одной строкой и возвращают однозначное результирующее значение для каждой строки.
Python нескалярный
  • Бессерверные записные книжки и задания
  • Классические вычисления с стандартным режимом доступа (Databricks Runtime 14.3 LTS и выше)
  • Декларативные конвейеры Lakeflow (классические и бессерверные)
Нескалярные определяемые пользователем функции включают pandas_udf, mapInPandas, mapInArrow, applyInPandas. Пользовательские функции (UDFs) Pandas используют Apache Arrow для передачи данных и Pandas для работы с данными. Пользовательские функции Pandas (UDF) поддерживают векторные операции, которые могут значительно улучшить производительность по сравнению со скалярными пользовательскими функциями, обрабатывающими строки.
Определяемые пользователем функции Python
  • Бессерверные записные книжки и задания
  • Классические вычисления с стандартным режимом доступа (Databricks Runtime 14.3 LTS и выше)
  • Декларативные конвейеры Lakeflow (классические и бессерверные)
UDTF принимает один или несколько входных аргументов и возвращает несколько строк (и, возможно, несколько столбцов) для каждой входной строки.
Скалярные пользовательские функции Scala
  • Классические вычисления с стандартным режимом доступа (Databricks Runtime 13.3 LTS и более поздней версии)
Скалярные UDF работают с одной строкой и возвращают однозначное результирующее значение для каждой строки.
Scala UDAFs
  • Классические вычисления с стандартным режимом доступа (Databricks Runtime 14.2 LTS и выше)
UDAFs работают с несколькими строками и возвращают один агрегированный результат.

Рекомендации по производительности

  • Встроенные функции и пользовательские функции SQL являются наиболее эффективными вариантами.

  • Масштабируемые определяемые пользователем функции обычно быстрее, чем определяемые пользователем Функции Python.

    • Несоизолированные UDF функции Scala выполняются в виртуальной машине Java (JVM), поэтому они позволяют избежать затрат на перемещение данных в JVM и из него.
    • Изолированные пользовательские функции Scala должны перемещать данные в и из JVM, но они по-прежнему могут быть быстрее, чем функции Python UDF, так как они более эффективно обрабатывают память.
  • Определяемые пользователем функции Python и функции pandas обычно работают медленнее, чем функции Scala, потому что им нужно сериализовать данные и перемещать их из JVM в интерпретатор Python.

    • Пользовательские функции Pandas в 100 раз быстрее, чем функции Python, так как они используют Apache Arrow для снижения затрат на сериализацию.