Примечание
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Определяемые пользователем функции позволяют повторно использовать и совместно использовать код, расширяющий встроенные функции в Azure Databricks. Используйте функции, определяемые пользователем, для выполнения специфических задач, таких как сложные вычисления, преобразования или кастомизация данных.
Когда следует использовать функцию UDF и Apache Spark?
Используйте UDF (определяемые пользователем функции) для логики, которую сложно выразить с помощью встроенных функций Apache Spark. Встроенные функции Apache Spark оптимизированы для распределенной обработки и обеспечивают более высокую производительность в масштабе. См. дополнительные сведения о функциях.
Databricks рекомендует пользовательские функции для нерегламентированных запросов, ручной очистки данных, а также анализа и операций с небольшими и средними наборами данных. Обычные случаи использования UDF включают шифрование и расшифровку данных, хэширование, анализ JSON и валидацию.
Используйте методы Apache Spark для операций с очень большими наборами данных и любых рабочих нагрузок, выполняемых регулярно или непрерывно, включая задания ETL и операции потоковой передачи.
Общие сведения о типах UDF
Выберите тип UDF на следующих вкладках, чтобы просмотреть описание, пример и ссылку, чтобы узнать больше.
Скалярная пользовательская функция (UDF)
Скалярные UDF работают с одной строкой и возвращают однозначное результирующее значение для каждой строки. Они могут быть под управлением каталога Unity или иметь область действия сеанса.
В следующем примере используется скалярная UDF для вычисления длины каждого имени в столбце name
и добавления значения в новый столбец name_length
.
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
Для реализации этого в записной книжке Databricks с помощью PySpark выполните указанные ниже действия.
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
См. определяемые пользователем функции (UDF) в каталоге Unity и определяемые пользователем скалярные функции на Python.
Пакетные скалярные пользовательские функции
Обработка данных в пакетах при сохранении четности входных и выходных строк 1:1. Это снижает накладные расходы на операции с отдельными строками при обработке крупномасштабных данных. Пакетные функции, определяемые пользователем (UDF), также поддерживают сохранение состояния между пакетами, что позволяет более эффективно выполнять задачи, повторно использовать ресурсы и обрабатывать сложные вычисления, для которых требуется целостный контекст в блоках данных.
Они могут быть под управлением каталога Unity или иметь область действия сеанса.
Следующая функция UDF каталога Unity для пакетной обработки в Python вычисляет индекс массы тела (BMI) в процессе обработки пакетов строк.
+-------------+-------------+
| weight_kg | height_m |
+-------------+-------------+
| 90 | 1.8 |
| 77 | 1.6 |
| 50 | 1.5 |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
| BMI |
+--------+
| 27.8 |
| 30.1 |
| 22.2 |
+--------+
См. определяемые пользователем функции (UDF) в каталоге Unity и пакетные функции Python (UDF) в каталоге Unity.
Нескалярные определяемые пользователем функции
Нескалярные пользовательские функции работают с целыми наборами/столбцами данных с гибкими отношениями ввода-вывода (1:N или многие:многие).
Пакетные векторизованные функции pandas, строго относящиеся к сеансу, могут быть следующих типов:
- Серия к серии
- Итератор серии в итератор серии
- Итератор нескольких рядов в итератор серии
- Серия к скаляру
Пример UDF pandas для преобразования Series в Series приведен ниже.
from pyspark.sql.functions import pandas_udf
import pandas as pd
df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])
@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
return weight / (height ** 2)
df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()
См. определяемые пользователем функции Pandas.
UDAF
UDAFs работают с несколькими строками и возвращают один агрегированный результат. UDAFs ограничены только областью сеанса.
В следующем примере UDAF суммируются оценки по длине имени.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
Обратитесь к пользовательским функциям pandas для Python и определяемым пользователем агрегатным функциям для Scala.
Определяемые пользователем функции
UDTF принимает один или несколько входных аргументов и возвращает несколько строк (и, возможно, несколько столбцов) для каждой входной строки. Пользовательские табличные функции (UDTFs) ограничены только областью сеанса.
В следующем примере каждое значение в столбце оценки соответствует списку категорий. UDTF разбивает разделенный запятыми список на несколько строк.
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="name: string, score: int, category: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: int, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
ScoreCategoriesUDTF(lit("Alice"), lit(85), lit("Math,Science,English")).display()
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
См. определяемые пользователем табличные функции Python (UDTFs).
Управляемые Каталогом Unity и относящиеся к области сеанса UDF (пользовательские функции)
Определяемые пользователем функции (UDF) каталога Unity и пакетные функции (Batch UDF) каталога Unity сохраняются в каталоге Unity для улучшения управляемости, повторного использования и возможности обнаружения. Все остальные определяемые пользователем функции сеансовые, что означает, что они определены в записной книжке или задании и ограничиваются текущим SparkSession. Вы можете определить и получить доступ к определяемой пользователем области сеанса с помощью Scala или Python.
Каталог Unity: памятка по управляемым пользовательским функциям (UDF)
Каталог Unity с управляемыми UDF (определяемыми пользователем функциями) позволяет определять, использовать, безопасно делиться и управлять этими функциями в вычислительных средах. См. функции, определяемые пользователем (UDF), в каталоге Unity.
Тип UDF | Поддерживаемые вычислительные ресурсы | Описание |
---|---|---|
Python UDF каталога Unity |
|
Определите UDF в Python и зарегистрируйте его в каталоге Unity для управления. Скалярные UDF работают с одной строкой и возвращают однозначное результирующее значение для каждой строки. |
UDF пакетного каталога Unity |
|
Определите UDF в Python и зарегистрируйте его в каталоге Unity для управления. Пакетные операции с несколькими значениями и возврат нескольких значений. Снижает издержки на построчные операции для обработки крупномасштабных данных. |
Памятка по определяемой пользователем области сеанса для изолированных пользователем вычислений
Определяемые пользователем функции, привязанные к сеансу, определяются в записной книжке или задании и относятся к текущему SparkSession. Вы можете определить и получить доступ к определяемой пользователем области сеанса с помощью Scala или Python.
Тип UDF | Поддерживаемые вычислительные ресурсы | Описание |
---|---|---|
Python скаляр |
|
Скалярные UDF работают с одной строкой и возвращают однозначное результирующее значение для каждой строки. |
Python нескалярный |
|
Нескалярные определяемые пользователем функции включают pandas_udf , mapInPandas , mapInArrow , applyInPandas . Пользовательские функции (UDFs) Pandas используют Apache Arrow для передачи данных и Pandas для работы с данными. Пользовательские функции Pandas (UDF) поддерживают векторные операции, которые могут значительно улучшить производительность по сравнению со скалярными пользовательскими функциями, обрабатывающими строки. |
Определяемые пользователем функции Python |
|
UDTF принимает один или несколько входных аргументов и возвращает несколько строк (и, возможно, несколько столбцов) для каждой входной строки. |
Скалярные пользовательские функции Scala |
|
Скалярные UDF работают с одной строкой и возвращают однозначное результирующее значение для каждой строки. |
Scala UDAFs |
|
UDAFs работают с несколькими строками и возвращают один агрегированный результат. |
Рекомендации по производительности
Встроенные функции и пользовательские функции SQL являются наиболее эффективными вариантами.
Масштабируемые определяемые пользователем функции обычно быстрее, чем определяемые пользователем Функции Python.
- Несоизолированные UDF функции Scala выполняются в виртуальной машине Java (JVM), поэтому они позволяют избежать затрат на перемещение данных в JVM и из него.
- Изолированные пользовательские функции Scala должны перемещать данные в и из JVM, но они по-прежнему могут быть быстрее, чем функции Python UDF, так как они более эффективно обрабатывают память.
Определяемые пользователем функции Python и функции pandas обычно работают медленнее, чем функции Scala, потому что им нужно сериализовать данные и перемещать их из JVM в интерпретатор Python.
- Пользовательские функции Pandas в 100 раз быстрее, чем функции Python, так как они используют Apache Arrow для снижения затрат на сериализацию.