Поделиться через


Что такое определяемые пользователем функции (определяемые пользователем функции)?

Определяемые пользователем функции позволяют повторно использовать и совместно использовать код, расширяющий встроенные функции в Azure Databricks. Используйте определяемые пользователем функции для выполнения определенных задач, таких как сложные вычисления, преобразования или пользовательские манипуляции с данными.

Примечание.

В кластерах с режимом общего доступа скалярные определяемые пользователем функции Python поддерживаются в Databricks Runtime 13.3 LTS и более поздних версиях, а Скала UDFs поддерживается в Databricks Runtime 14.2 и выше.

Скалярные определяемые пользователем Функции Python можно зарегистрировать в каталоге Unity с помощью синтаксиса SQL в Databricks Runtime 13.3 LTS и более поздних версий. См . определяемые пользователем функции в каталоге Unity.

Когда следует использовать UDF?

Используйте определяемые пользователем функции Для логики, которую сложно выразить со встроенными функциями Apache Spark. Встроенные функции Apache Spark оптимизированы для распределенной обработки и обычно обеспечивают более высокую производительность в масштабе. См. дополнительные сведения о функциях.

Databricks рекомендует определяемые пользователем функции для нерегламентированных запросов, ручной очистки данных, анализа и операций с небольшими и средними наборами данных. Распространенные варианты использования для определяемых пользователем функций включают шифрование данных и расшифровку, хэширование, синтаксический анализ JSON и проверку.

Используйте методы Apache Spark для операций с очень большими наборами данных и любыми рабочими нагрузками, которые выполняются регулярно или непрерывно, включая задания ETL и операции потоковой передачи.

Зарегистрированные и ограниченные определяемые пользователем сеансы

Определяемые пользователем функции, созданные с помощью SQL, регистрируются в каталоге Unity и имеют связанные разрешения, в то время как определяемые пользователем функции, созданные в записной книжке, основаны на сеансах и применяются к текущему приложению SparkSession.

Вы можете определить и получить доступ к пользовательским файлам на основе сеансов с помощью любого языка, поддерживаемого Azure Databricks. Определяемые пользователем функции могут быть скалярными или не скалярными.

Примечание.

В настоящее время в DBSQL доступны только скалярные определяемые пользователем SQL и Python, зарегистрированные в каталоге Unity.

Определяемые пользователем скалярные функции

Скалярные определяемые пользователем функции работают с одной строкой и возвращают одно значение для каждой строки. В следующем примере используется скалярная UDF для вычисления длины каждого имени в столбце name и добавления значения в новый столбец name_length:

+-------+-------+
| name  | score |
+-------+-------+
| alice |  10.0 |
| bob   |  20.0 |
| carol |  30.0 |
| dave  |  40.0 |
| eve   |  50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);

-- Use the UDF in a SQL query
SELECT name, get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name  | score | name_length |
+-------+-------+-------------+
| alice |  10.0 |      5      |
|  bob  |  20.0 |      3      |
| carol |  30.0 |      5      |
| dave  |  40.0 |      4      |
|  eve  |  50.0 |      3      |
+-------+-------+-------------+

Для реализации этого в записной книжке Databricks с помощью PySpark выполните указанные ниже действия.

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(returnType=IntegerType())
def get_name_length(name):
   return len(name)

df = df.withColumn("name_length", get_name_length(df.name))

# Show the result
display(df)

Дополнительные сведения см. в разделе "Определяемые пользователем функции" в каталоге Unity и определяемых пользователем скалярных функциях — Python.

Определяемые пользователем агрегатные функции (UDAF)

Определяемые пользователем агрегатные функции (UDAFs) работают с несколькими строками и возвращают один агрегированный результат. В следующем примере определяется UDAF, которая объединяет оценки.

from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd

# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
    return scores.sum()

# Group by name length and aggregate
result_df = (df.groupBy("name_length")
              .agg(total_score_udf(df["score"]).alias("total_score")))

display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
|      3      |     70.0    |
|      4      |     40.0    |
|      5      |     40.0    |
+-------------+-------------+

Ознакомьтесь с пользовательскими функциями pandas для функций python и определяемых пользователем агрегатных функций — Scala.

Определяемые пользователем функции таблиц Python (определяемые пользователем функции)

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

Примечание.

Определяемые пользователем функции Python доступны в Databricks Runtime 14.3 LTS и более поздних версиях.

Определяемые пользователем функции таблиц Python (UDTFs) могут возвращать несколько строк и столбцов для каждой входной строки. В следующем примере каждое значение в столбце оценки соответствует списку категорий. UDTF определяется для разделения списка запятых на несколько строк. См. определяемые пользователем функции таблиц Python (определяемые пользователем функции ).

+-------+-------+-----------------+
| name  | score |   categories    |
+-------+-------+-----------------+
| alice |  10.0 |  math,science   |
|  bob  |  20.0 |  history,math   |
| carol |  30.0 | science,history |
| dave  |  40.0 |    math,art     |
|  eve  |  50.0 |  science,art    |
+-------+-------+-----------------+

from pyspark.sql.functions import udtf

@udtf(returnType="score: int, categories: string, name: string")
class ScoreCategoriesUDTF:
    def eval(self, name: str, score: float, categories: str):
        category_list = categories.split(',')
        for category in category_list:
            yield (name, score, category)

# Apply the UDTF
result_df = df.select(ScoreCategoriesUDTF(df.score, df.categories, df.name))
display(result_df)
+-------+-------+----------+
| name  | score | category |
+-------+-------+----------+
| alice |  10.0 |   math   |
| alice |  10.0 | science  |
|  bob  |  20.0 | history  |
|  bob  |  20.0 |   math   |
| carol |  30.0 | science  |
| carol |  30.0 | history  |
| dave  |  40.0 |   math   |
| dave  |  40.0 |   art    |
|  eve  |  50.0 | science  |
|  eve  |  50.0 |   art    |
+-------+-------+----------+

Рекомендации по производительности