Поделиться через


Определяемые пользователем скалярные функции — Python

В этой статье приведены примеры определяемых пользователем функций Python. В нём показано, как зарегистрировать определяемые пользователем функции (UDF), как их вызывать, и предоставляются предостережения о порядке вычисления вложенных выражений в Spark SQL.

В Databricks Runtime 14.0 и более поздних версиях можно использовать определяемые пользователем функции таблиц Python для регистрации функций, возвращающих все отношения вместо скалярных значений. См. определяемые пользователем табличные функции Python (UDTFs).

Примечание.

В Databricks Runtime 12.2 LTS и ниже, Python UDF и Pandas UDF не поддерживаются на компьютерах Unity Catalog, использующих стандартный режим доступа. Скалярные пользовательские функции и функции Pandas на Python поддерживаются в Databricks Runtime 13.3 LTS и более поздних версиях для всех режимов доступа.

В Databricks Runtime 13.3 LTS и более поздних версиях можно зарегистрировать скалярные пользовательские файлы Python в каталоге Unity с помощью синтаксиса SQL. См. функции, определяемые пользователем (UDF), в каталоге Unity.

Зарегистрировать функцию как ODF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

При желании можно задать тип возвращаемого значения вашей UDF. Тип возврата по умолчанию — это StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Вызовите UDF в Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Использование UDF с DataFrame

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Вы также можете объявить определяемую пользователем функцию с помощью синтаксиса аннотации:

from pyspark.sql.functions import udf

@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Варианты с UDF

Тип PySpark для variant — VariantType, а значения имеют тип VariantVal. Сведения о вариантах см. в разделе "Запрос вариантов данных".

from pyspark.sql.types import VariantType

# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
  return VariantVal.parseJson(jsonString)

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
|        {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
  return {"v": VariantVal.parseJson(jsonString)}

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
|        {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
  return [VariantVal.parseJson(jsonString)]

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
|           [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
  return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
|         toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+

Порядок вычисления и проверка значений NULL

В Spark SQL (включая SQL, а также API DataFrame и Dataset) не гарантируется определенный порядок вычисления подвыражений. В частности, входные данные оператора или функции не обязательно вычисляются слева направо или в любом другом фиксированном порядке. Например, логические выражения AND и OR не обладают семантикой "короткого замыкания", при котором вычисления происходят слева направо.

Таким образом, не следует полагаться на побочные эффекты или порядок вычисления логических выражений, а также на порядок предложений WHERE и HAVING, так как эти выражения и порядок предложений могут изменяться в результате оптимизации и планирования запросов. В частности, если UDF использует семантику короткого замыкания в SQL для проверки на null, нет гарантии, что эта проверка будет выполнена до вызова UDF. Например,

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

В этом примере выражение WHERE не гарантирует, что определяемая пользователем функция strlen будет вызываться после фильтрации значений NULL.

Для правильной проверки значений NULL рекомендуется выполнить одно из следующих действий:

  • Сделайте UDF способной обрабатывать значения NULL и выполняйте проверку на NULL внутри самой UDF.
  • Используйте выражения IF или CASE WHEN для проверки значения NULL и вызова определяемой пользователем функции в условной ветви.
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Учетные данные службы в скалярных пользовательских файлах Python

Скалярные пользовательские функции Python могут использовать учетные данные службы каталога Unity для безопасного доступа к внешним облачным службам. Это полезно для интеграции таких операций, как облачная маркеризация, шифрование или управление секретами непосредственно в преобразования данных.

Учетные данные службы для скалярных пользовательских файлов Python поддерживаются только в хранилище SQL и общих вычислительных ресурсах.

Примечание.

Учетные данные службы в скалярных пользовательских файлах Python требуют Databricks Runtime 17.1 и более поздних версий.

Сведения о создании учетных данных службы см. в разделе "Создание учетных данных службы".

Примечание.

API, специфичный для UDF, для учетных данных службы:
В пользовательских функциях используйте databricks.service_credentials.getServiceCredentialsProvider() для доступа к учетным данным службы.

Это отличается от функции, используемой dbutils.credentials.getServiceCredentialsProvider() в записных книжках, которая недоступна в контекстах выполнения UDF.

Чтобы получить доступ к учетным данным службы, используйте databricks.service_credentials.getServiceCredentialsProvider() программу в логике UDF для инициализации облачных пакетов SDK с соответствующими учетными данными. Весь код должен быть инкапсулирован в тексте UDF.

@udf
def use_service_credential():
    from azure.mgmt.web import WebSiteManagementClient

    # Assuming there is a service credential named 'testcred' set up in Unity Catalog
    web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
    # Use web_client to perform operations

Разрешения учетных данных службы

Создатель UDF должен иметь разрешение ACCESS на учетные данные службы каталога Unity.

Определяемые пользователем функции, которые выполняются в области No-PE, также известные как выделенные кластеры, требуют разрешения MANAGE для учетных данных службы.

Учетные данные по умолчанию

При использовании в скалярных пользовательских файлах Python Databricks автоматически использует учетные данные службы по умолчанию из переменной вычислительной среды. Это поведение позволяет безопасно ссылаться на внешние службы без явного управления псевдонимами учетных данных в коде UDF. См . раздел "Указание учетных данных службы по умолчанию" для вычислительного ресурса

Поддержка учетных данных по умолчанию доступна только в кластерах стандартного и выделенного режима доступа. Она недоступна в DBSQL.

Для использования поставщика azure-identity необходимо установить пакет DefaultAzureCredential. Чтобы установить пакет, ознакомьтесь с библиотеками Python с областью записной книжки или библиотеками с областью вычислений.

@udf
def use_service_credential():
    from azure.identity import DefaultAzureCredential
    from azure.mgmt.web import WebSiteManagementClient

    # DefaultAzureCredential is automatically using the default service credential for the compute
    web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)

    # Use web_client to perform operations

Получить контекст выполнения задачи

Используйте TaskContext PySpark API, чтобы получить информацию о контексте, такую как идентификация пользователя, метки кластера, идентификатор задания Spark и многое другое. См. Получение контекста задачи в UDF.

Ограничения

Следующие ограничения применяются к UDF PySpark.

  • Ограничения доступа к файлам: В Databricks Runtime 14.2 и ниже пользовательские файлы PySpark в общих кластерах не могут получить доступ к папкам Git, файлам рабочей области или томам каталога Unity.

  • Широковещательные переменные: UDF PySpark в кластерах со стандартным режимом доступа и бессерверных вычислениях не поддерживают широковещательные переменные.

  • Учетные данные службы: Учетные данные службы доступны только в пользовательских файлах каталога Batch Unity и Скалярных определяемых пользователем Python. Они не поддерживаются в стандартных UDFS каталога Unity.

  • Учетные данные службы: учетные данные службы доступны только в бессерверных вычислениях при использовании бессерверной среды версии 3 или выше. См. версии бессерверных сред .

  • Ограничение памяти на бессерверных: UDF PySpark на бессерверных вычислительных ресурсах имеют ограничение памяти в 1 ГБ на каждую функцию UDF PySpark. Превышение этого ограничения приводит к ошибке типа UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT_SERVERLESS.
  • Ограничение памяти в стандартном режиме доступа: пользовательские функции PySpark в стандартном режиме доступа имеют ограничение памяти на основе доступной памяти выбранного типа экземпляра. Превышение доступной памяти приводит к ошибке типа UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT.