Определяемые пользователем скалярные функции — Python
В этой статье приведены примеры определяемых пользователем функций Python. В нем показано, как зарегистрировать определяемые пользователем функции, как вызвать определяемые пользователем функции и предоставить предостережения о порядке вычисления вложенных выражений в Spark SQL.
В Databricks Runtime 14.0 и более поздних версиях можно использовать определяемые пользователем функции таблиц Python для регистрации функций, возвращающих все отношения вместо скалярных значений. См. сведения о определяемых пользователем функциях таблиц Python (UDTFs).
Примечание.
В Databricks Runtime 12.2 LTS и ниже определяемые пользователем функции Python и Pandas не поддерживаются в вычислительных ресурсах каталога Unity, использующих режим общего доступа. Скалярные ОПРЕДЕЛяемые пользователем Python и Pandas поддерживаются в Databricks Runtime 13.3 LTS и более поздних версиях для всех режимов доступа.
В Databricks Runtime 13.3 LTS и более поздних версиях можно зарегистрировать скалярные пользовательские файлы Python в каталоге Unity с помощью синтаксиса SQL. См . определяемые пользователем функции в каталоге Unity.
Регистрация определяемой пользователем функции
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
При необходимости можно задать тип возвращаемого значения для определяемой пользователем функции. По умолчанию это StringType
.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Вызов определяемой пользователем функции в Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Использование определяемой пользователем функции с кадрами данных
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Вы также можете объявить определяемую пользователем функцию с помощью синтаксиса аннотации:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Порядок вычисления и проверка значений NULL
В SQL Spark (включая SQL, а также API кадров и наборов данных) не гарантируется определенный порядок вычисления частей выражения. В частности, входные данные оператора или функции не обязательно вычисляются слева направо или в любом другом фиксированном порядке. Например, логические выражения AND
и OR
не имеют привычной семантики слева направо.
Таким образом, не следует полагаться на побочные эффекты или порядок вычисления логических выражений, а также порядок предложений WHERE
и HAVING
, так как этот порядок и правила применения предложений могут изменяться в результате оптимизации или планирования запросов. В частности, если определяемая пользователем функция использует привычную семантику в SQL для проверки значений NULL, нет гарантии того, что эта проверка произойдет перед вызовом функции. Например,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
В этом примере выражение WHERE
не гарантирует, что определяемая пользователем функция strlen
будет вызываться после фильтрации значений NULL.
Для правильной проверки значений NULL рекомендуется выполнить одно из следующих действий:
- Реализуйте в функции поддержку значения NULL и выполняйте проверку этих значений внутри самой функции.
- Используйте выражения
IF
илиCASE WHEN
для проверки значения NULL и вызова определяемой пользователем функции в условной ветви.
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Ограничения
- Пользовательские функции PySpark в общих кластерах или бессерверных вычислительных ресурсах не могут получить доступ к папкам Git, файлам рабочей области или томам UC для импорта модулей в Databricks Runtime 14.2 и ниже.