Что такое определяемые пользователем функции (определяемые пользователем функции)?
Определяемые пользователем функции позволяют повторно использовать и совместно использовать код, расширяющий встроенные функции в Azure Databricks. Используйте определяемые пользователем функции для выполнения определенных задач, таких как сложные вычисления, преобразования или пользовательские манипуляции с данными.
Примечание.
В кластерах с режимом общего доступа скалярные определяемые пользователем функции Python поддерживаются в Databricks Runtime 13.3 LTS и более поздних версиях, а Скала UDFs поддерживается в Databricks Runtime 14.2 и выше.
Скалярные определяемые пользователем Функции Python можно зарегистрировать в каталоге Unity с помощью синтаксиса SQL в Databricks Runtime 13.3 LTS и более поздних версий. См . определяемые пользователем функции в каталоге Unity.
Когда следует использовать UDF?
Используйте определяемые пользователем функции Для логики, которую сложно выразить со встроенными функциями Apache Spark. Встроенные функции Apache Spark оптимизированы для распределенной обработки и обычно обеспечивают более высокую производительность в масштабе. См. дополнительные сведения о функциях.
Databricks рекомендует определяемые пользователем функции для нерегламентированных запросов, ручной очистки данных, анализа и операций с небольшими и средними наборами данных. Распространенные варианты использования для определяемых пользователем функций включают шифрование данных и расшифровку, хэширование, синтаксический анализ JSON и проверку.
Используйте методы Apache Spark для операций с очень большими наборами данных и любыми рабочими нагрузками, которые выполняются регулярно или непрерывно, включая задания ETL и операции потоковой передачи.
Зарегистрированные и ограниченные определяемые пользователем сеансы
Определяемые пользователем функции, созданные с помощью SQL, регистрируются в каталоге Unity и имеют связанные разрешения, в то время как определяемые пользователем функции, созданные в записной книжке, основаны на сеансах и применяются к текущему приложению SparkSession.
Вы можете определить и получить доступ к пользовательским файлам на основе сеансов с помощью любого языка, поддерживаемого Azure Databricks. Определяемые пользователем функции могут быть скалярными или не скалярными.
Примечание.
В настоящее время в DBSQL доступны только скалярные определяемые пользователем SQL и Python, зарегистрированные в каталоге Unity.
Определяемые пользователем скалярные функции
Скалярные определяемые пользователем функции работают с одной строкой и возвращают одно значение для каждой строки. В следующем примере используется скалярная UDF для вычисления длины каждого имени в столбце name
и добавления значения в новый столбец name_length
:
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
Для реализации этого в записной книжке Databricks с помощью PySpark выполните указанные ниже действия.
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
Дополнительные сведения см. в разделе "Определяемые пользователем функции" в каталоге Unity и определяемых пользователем скалярных функциях — Python.
Определяемые пользователем агрегатные функции (UDAF)
Определяемые пользователем агрегатные функции (UDAFs) работают с несколькими строками и возвращают один агрегированный результат. В следующем примере определяется UDAF, которая объединяет оценки.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
Ознакомьтесь с пользовательскими функциями pandas для функций python и определяемых пользователем агрегатных функций — Scala.
Определяемые пользователем функции таблиц Python (определяемые пользователем функции)
Внимание
Эта функция предоставляется в режиме общедоступной предварительной версии.
Примечание.
Определяемые пользователем функции Python доступны в Databricks Runtime 14.3 LTS и более поздних версиях.
Определяемые пользователем функции таблиц Python (UDTFs) могут возвращать несколько строк и столбцов для каждой входной строки. В следующем примере каждое значение в столбце оценки соответствует списку категорий. UDTF определяется для разделения списка запятых на несколько строк. См. определяемые пользователем функции таблиц Python (определяемые пользователем функции )
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="score: int, categories: string, name: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: float, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
# Apply the UDTF
result_df = df.select(ScoreCategoriesUDTF(df.score, df.categories, df.name))
display(result_df)
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
Рекомендации по производительности
- Встроенные функции и определяемые пользователем функции SQL являются наиболее эффективным вариантом.
- Масштабируемые определяемые пользователем функции обычно выполняются быстрее, так как они выполняются на виртуальной машине Java (JVM) и позволяют избежать затрат на перемещение данных в JVM и из него.
- Определяемые пользователем функции Python и Pandas, как правило, медленнее, чем UDFs Scala, так как они требуют сериализации и перемещения данных из JVM в интерпретатор Python. Пользовательские функции Pandas до 100x быстрее, чем определяемые пользователем Python, так как они используют Apache Arrow для снижения затрат на сериализацию.