Заметка
Доступ к этой странице требует авторизации. Вы можете попробовать войти в систему или изменить каталог.
Доступ к этой странице требует авторизации. Вы можете попробовать сменить директорию.
В этой статье приведены примеры определяемых пользователем функций Python. В нём показано, как зарегистрировать определяемые пользователем функции (UDF), как их вызывать, и предоставляются предостережения о порядке вычисления вложенных выражений в Spark SQL.
В Databricks Runtime 14.0 и более поздних версиях можно использовать определяемые пользователем функции таблиц Python для регистрации функций, возвращающих все отношения вместо скалярных значений. См. определяемые пользователем табличные функции Python (UDTFs).
Примечание.
В Databricks Runtime 12.2 LTS и ниже, Python UDF и Pandas UDF не поддерживаются на компьютерах Unity Catalog, использующих стандартный режим доступа. Скалярные пользовательские функции и функции Pandas на Python поддерживаются в Databricks Runtime 13.3 LTS и более поздних версиях для всех режимов доступа.
В Databricks Runtime 13.3 LTS и более поздних версиях можно зарегистрировать скалярные пользовательские файлы Python в каталоге Unity с помощью синтаксиса SQL. См. функции, определяемые пользователем (UDF), в каталоге Unity.
Зарегистрировать функцию как ODF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
При желании можно задать тип возвращаемого значения вашей UDF. Тип возврата по умолчанию — это StringType.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Вызовите UDF в Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Использование UDF с DataFrame
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Вы также можете объявить определяемую пользователем функцию с помощью синтаксиса аннотации:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Варианты с UDF
Тип PySpark для variant — VariantType, а значения имеют тип VariantVal. Сведения о вариантах см. в разделе "Запрос вариантов данных".
from pyspark.sql.types import VariantType
# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
return VariantVal.parseJson(jsonString)
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
| {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
return {"v": VariantVal.parseJson(jsonString)}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
| {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
return [VariantVal.parseJson(jsonString)]
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
| [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
| toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+
Порядок вычисления и проверка значений NULL
В Spark SQL (включая SQL, а также API DataFrame и Dataset) не гарантируется определенный порядок вычисления подвыражений. В частности, входные данные оператора или функции не обязательно вычисляются слева направо или в любом другом фиксированном порядке. Например, логические выражения AND и OR не обладают семантикой "короткого замыкания", при котором вычисления происходят слева направо.
Таким образом, не следует полагаться на побочные эффекты или порядок вычисления логических выражений, а также на порядок предложений WHERE и HAVING, так как эти выражения и порядок предложений могут изменяться в результате оптимизации и планирования запросов. В частности, если UDF использует семантику короткого замыкания в SQL для проверки на null, нет гарантии, что эта проверка будет выполнена до вызова UDF. Например,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
В этом примере выражение WHERE не гарантирует, что определяемая пользователем функция strlen будет вызываться после фильтрации значений NULL.
Для правильной проверки значений NULL рекомендуется выполнить одно из следующих действий:
- Сделайте UDF способной обрабатывать значения NULL и выполняйте проверку на NULL внутри самой UDF.
- Используйте выражения
IFилиCASE WHENдля проверки значения NULL и вызова определяемой пользователем функции в условной ветви.
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Учетные данные службы в скалярных пользовательских файлах Python
Скалярные пользовательские функции Python могут использовать учетные данные службы каталога Unity для безопасного доступа к внешним облачным службам. Это полезно для интеграции таких операций, как облачная маркеризация, шифрование или управление секретами непосредственно в преобразования данных.
Учетные данные службы для скалярных пользовательских файлов Python поддерживаются только в хранилище SQL и общих вычислительных ресурсах.
Примечание.
Учетные данные службы в скалярных пользовательских файлах Python требуют Databricks Runtime 17.1 и более поздних версий.
Сведения о создании учетных данных службы см. в разделе "Создание учетных данных службы".
Примечание.
API, специфичный для UDF, для учетных данных службы:
В пользовательских функциях используйте databricks.service_credentials.getServiceCredentialsProvider() для доступа к учетным данным службы.
Это отличается от функции, используемой dbutils.credentials.getServiceCredentialsProvider() в записных книжках, которая недоступна в контекстах выполнения UDF.
Чтобы получить доступ к учетным данным службы, используйте databricks.service_credentials.getServiceCredentialsProvider() программу в логике UDF для инициализации облачных пакетов SDK с соответствующими учетными данными. Весь код должен быть инкапсулирован в тексте UDF.
@udf
def use_service_credential():
from azure.mgmt.web import WebSiteManagementClient
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
# Use web_client to perform operations
Разрешения учетных данных службы
Создатель UDF должен иметь разрешение ACCESS на учетные данные службы каталога Unity.
Определяемые пользователем функции, которые выполняются в области No-PE, также известные как выделенные кластеры, требуют разрешения MANAGE для учетных данных службы.
Учетные данные по умолчанию
При использовании в скалярных пользовательских файлах Python Databricks автоматически использует учетные данные службы по умолчанию из переменной вычислительной среды. Это поведение позволяет безопасно ссылаться на внешние службы без явного управления псевдонимами учетных данных в коде UDF. См . раздел "Указание учетных данных службы по умолчанию" для вычислительного ресурса
Поддержка учетных данных по умолчанию доступна только в кластерах стандартного и выделенного режима доступа. Она недоступна в DBSQL.
Для использования поставщика azure-identity необходимо установить пакет DefaultAzureCredential. Чтобы установить пакет, ознакомьтесь с библиотеками Python с областью записной книжки или библиотеками с областью вычислений.
@udf
def use_service_credential():
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient
# DefaultAzureCredential is automatically using the default service credential for the compute
web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)
# Use web_client to perform operations
Получить контекст выполнения задачи
Используйте TaskContext PySpark API, чтобы получить информацию о контексте, такую как идентификация пользователя, метки кластера, идентификатор задания Spark и многое другое. См. Получение контекста задачи в UDF.
Ограничения
Следующие ограничения применяются к UDF PySpark.
Ограничения доступа к файлам: В Databricks Runtime 14.2 и ниже пользовательские файлы PySpark в общих кластерах не могут получить доступ к папкам Git, файлам рабочей области или томам каталога Unity.
Широковещательные переменные: UDF PySpark в кластерах со стандартным режимом доступа и бессерверных вычислениях не поддерживают широковещательные переменные.
Учетные данные службы: Учетные данные службы доступны только в пользовательских файлах каталога Batch Unity и Скалярных определяемых пользователем Python. Они не поддерживаются в стандартных UDFS каталога Unity.
Учетные данные службы: учетные данные службы доступны только в бессерверных вычислениях при использовании бессерверной среды версии 3 или выше. См. версии бессерверных сред .
- Ограничение памяти на бессерверных: UDF PySpark на бессерверных вычислительных ресурсах имеют ограничение памяти в 1 ГБ на каждую функцию UDF PySpark. Превышение этого ограничения приводит к ошибке типа UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT_SERVERLESS.
- Ограничение памяти в стандартном режиме доступа: пользовательские функции PySpark в стандартном режиме доступа имеют ограничение памяти на основе доступной памяти выбранного типа экземпляра. Превышение доступной памяти приводит к ошибке типа UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT.