Заметка
Доступ к этой странице требует авторизации. Вы можете попробовать войти в систему или изменить каталог.
Доступ к этой странице требует авторизации. Вы можете попробовать сменить директорию.
Это важно
Эта функция доступна в общедоступной предварительной версии.
Определяемые пользователем функции (UDF) каталога Unity для пакетной обработки с помощью Python расширяют возможности стандартных UDF каталога Unity, позволяя писать код на Python для обработки пакетов данных, что значительно повышает эффективность за счет снижения накладных расходов, связанных с обработкой данных по одной строке. Эти оптимизации делают UDF на Python в Unity Catalog оптимальными для обработки данных в крупном масштабе.
Требования
Для пакетных пользовательских функций Unity Catalog на Python требуется Databricks Runtime версии 16.3 и выше.
Создание Python UDF для каталога пакетной обработки Unity
Создание пользовательской функции (UDF) пакетного каталога Unity аналогично созданию обычной пользовательской функции каталога Unity со следующими дополнениями:
-
PARAMETER STYLE PANDAS: это указывает, что UDF обрабатывает данные в пакетах с помощью итераторов pandas. -
HANDLER 'handler_function': это указывает функцию обработчика, которая вызывается для обработки пакетов.
В следующем примере показано, как создать Python UDF для каталога Unity для пакетных заданий.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
После регистрации UDF его можно вызвать с помощью SQL или Python:
SELECT person_id, calculate_bmi_pandas(weight_kg, height_m) AS bmi
FROM (
SELECT 1 AS person_id, CAST(70.0 AS DOUBLE) AS weight_kg, CAST(1.75 AS DOUBLE) AS height_m UNION ALL
SELECT 2 AS person_id, CAST(80.0 AS DOUBLE) AS weight_kg, CAST(1.80 AS DOUBLE) AS height_m
);
Функция обработчика UDF для пакетной обработки
Для Python UDF каталога Unity требуется функция обработчика, которая обрабатывает пакеты и возвращает результаты. Необходимо указать имя функции обработчика при создании UDF, используя ключ HANDLER.
Функция обработчика выполняет следующие действия:
- Принимает аргумент итератора, который выполняет итерацию по одному или нескольким
pandas.Series. Каждыйpandas.Seriesсодержит входные параметры UDF. - Выполняет итерацию по генератору и обрабатывает данные.
- Возвращает генераторный итератор.
ППФ каталога Batch Unity должны возвращать то же количество строк, что и вход. Эта функция обработчика обеспечивает это путем получения pandas.Series той же длины, что и входной ряд для каждого пакета.
Установка пользовательских зависимостей
Вы можете расширить функциональные возможности пользовательских Python UDF в Unity Catalog для пакетной обработки за пределами среды выполнения Databricks, определив пользовательские зависимости для внешних библиотек.
См. Расширение UDF с использованием настраиваемых зависимостей.
Пакетные функции, определяемые пользователем, могут принимать один или несколько параметров.
Один параметр: Когда функция обработчика использует один входной параметр, он получает итератор, который выполняет итерацию по каждому пакету pandas.Series .
%sql
CREATE OR REPLACE TEMPORARY FUNCTION one_parameter_udf(value INT)
RETURNS STRING
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for value_batch in batch_iter:
d = {"min": value_batch.min(), "max": value_batch.max()}
yield pd.Series([str(d)] * len(value_batch))
$$;
SELECT one_parameter_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL;
Несколько параметров: Для нескольких входных параметров функция обработчика получает итератор, который выполняет итерацию по нескольким pandas.Series. Значения в серии находятся в том же порядке, что и входные параметры.
%sql
CREATE OR REPLACE TEMPORARY FUNCTION two_parameter_udf(p1 INT, p2 INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for p1, p2 in batch_iter: # same order as arguments above
yield p1 + p2
$$;
SELECT two_parameter_udf(id , id + 1) from range(0, 100000, 3, 8);
Оптимизация производительности путем разделения дорогостоящих операций
Вы можете оптимизировать вычислительные дорогостоящие операции, разделив эти операции от функции обработчика. Это гарантирует, что они выполняются только один раз, а не во время каждой итерации по пакетам данных.
В следующем примере показано, как обеспечить выполнение дорогостоящих вычислений только один раз:
%sql
CREATE OR REPLACE TEMPORARY FUNCTION expensive_computation_udf(value INT)
RETURNS INT
LANGUAGE PYTHON
DETERMINISTIC
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
AS $$
def compute_value():
# expensive computation...
return 1
expensive_value = compute_value()
def handler_func(batch_iter):
for batch in batch_iter:
yield batch * expensive_value
$$;
SELECT expensive_computation_udf(id), count(*) from range(0, 100000, 3, 8) GROUP BY ALL
Изоляция среды
Замечание
Для общих сред изоляции требуется Databricks Runtime 17.1 и более поздних версий. В более ранних версиях все Python UDF Каталога Unity для пакетных заданий выполняются в строгом режиме изоляции.
Пакетные UDF Python в Unity Catalog с одинаковым владельцем и сеансом могут по умолчанию совместно использовать одну и ту же среду изоляции. Это может повысить производительность и уменьшить использование памяти, уменьшая количество отдельных сред, которые необходимо запустить.
Строгая изоляция
Чтобы убедиться, что UDF всегда выполняется в собственной, полностью изолированной среде, добавьте STRICT ISOLATION предложение характеристик.
Большинству определяемых пользователем пользователей не требуется строгая изоляция. Стандартные определяемые пользователем функции обработки данных используют общую среду изоляции по умолчанию и выполняются быстрее с меньшим потреблением памяти.
Добавьте предложение характеристики в определяемые STRICT ISOLATION пользователем функции:
- Выполнение входных данных в виде кода с помощью
eval()exec()или аналогичных функций - Запись файлов в локальную файловую систему
- Изменение глобальных переменных или состояния системы
- Изменение переменных среды
В следующем примере показан UDF, который выполняет входные данные в виде кода и требует строгой изоляции:
CREATE OR REPLACE TEMPORARY FUNCTION eval_string(input STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_func'
STRICT ISOLATION
AS $$
import pandas as pd
from typing import Iterator
def handler_func(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for code_series in batch_iter:
def eval_func(code):
try:
return str(eval(code))
except Exception as e:
return f"Error: {e}"
yield code_series.apply(eval_func)
$$;
Учетные данные службы в пользовательских файлах каталога Пакетной службы Unity
Пакетные пользовательские функции Python каталога Unity Catalog могут использовать учетные данные службы Unity Catalog для доступа к внешним облачным сервисам. Это особенно полезно для интеграции облачных функций, таких как маркеризаторы безопасности, в рабочие процессы обработки данных.
Замечание
API, специфичный для UDF, для учетных данных службы:
В пользовательских функциях используйте databricks.service_credentials.getServiceCredentialsProvider() для доступа к учетным данным службы.
Это отличается от функции, используемой dbutils.credentials.getServiceCredentialsProvider() в записных книжках, которая недоступна в контекстах выполнения UDF.
Сведения о создании учетных данных службы см. в разделе "Создание учетных данных службы".
Укажите учетные данные службы, которые вы хотите использовать в CREDENTIALS части определения UDF:
CREATE OR REPLACE TEMPORARY FUNCTION example_udf(data STRING)
RETURNS STRING
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
CREDENTIALS (
`credential-name` DEFAULT,
`complicated-credential-name` AS short_name,
`simple-cred`,
cred_no_quotes
)
AS $$
# Python code here
$$;
Разрешения учетных данных службы
Создатель UDF должен иметь ACCESS разрешение на учетные данные службы каталога Unity. Однако для пользователей, вызывающих UDF, достаточно предоставить им EXECUTE разрешение на UDF. В частности, вызывающие UDF не нуждаются в доступе к учетным данным базовой службы, так как UDF выполняется с помощью разрешений учетных данных создателя UDF.
Для временных функций создатель всегда является вызывающим элементом. Определяемые пользователем функции, которые выполняются в области No-PE, также известной как выделенные кластеры, используют разрешения вызывающего пользователя.
Учетные данные и псевдонимы по умолчанию
В условие можно включить несколько CREDENTIALS идентификаторов, но только один может быть помечен как DEFAULT. Вы можете псевдонимить учетные данные, отличные от по умолчанию, с помощью ключевого AS слова. У каждой учетной записи должен быть уникальный псевдоним.
Исправленные облачные SDK автоматически используют учетные данные по умолчанию. Учетные данные по умолчанию имеют приоритет над любой конфигурацией по умолчанию, указанной в настройках вычислений Spark, и сохраняются в определении UDF Каталога Unity.
Для использования поставщика azure-identity необходимо установить пакет DefaultAzureCredential. Используйте директиву ENVIRONMENT, чтобы установить внешние библиотеки. Дополнительные сведения об установке внешних библиотек см. в разделе «Расширение UDF с использованием пользовательских зависимостей».
Пример учетных данных службы — хранилище BLOB-объектов Azure
В следующем примере используются учетные данные службы для доступа к Azure Blob Storage из UDF каталога Unity для пакетных заданий.
%sql
CREATE OR REPLACE FUNCTION main.test.read_azure_blob(blob_name STRING) RETURNS STRING LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'batchhandler'
CREDENTIALS (
`batch-udf-service-creds-example-cred` DEFAULT
)
ENVIRONMENT (
dependencies = '["azure-identity", "azure-storage-blob"]', environment_version = 'None'
)
AS $$
import pandas as pd
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
def batchhandler(it):
# DefaultAzureCredential automatically uses the DEFAULT service credential
credential = DefaultAzureCredential()
blob_service = BlobServiceClient(
account_url="https://your-storage-account.blob.core.windows.net",
credential=credential
)
container = blob_service.get_container_client("your-container")
for blob_names in it:
results = []
for name in blob_names:
blob_client = container.get_blob_client(name)
try:
content = blob_client.download_blob().readall().decode("utf-8")
results.append(content)
except Exception as e:
results.append(f"Error: {e}")
yield pd.Series(results)
$$;
Вызовите UDF после его регистрации:
SELECT main.test.read_azure_blob(blob_name)
FROM VALUES
('config/settings.json'),
('data/input.txt')
AS t(blob_name)
Получить контекст выполнения задачи
Используйте TaskContext PySpark API, чтобы получить информацию о контексте, такую как идентификация пользователя, метки кластера, идентификатор задания Spark и многое другое. См. Получение контекста задачи в UDF.
Задать, DETERMINISTIC если функция создает согласованные результаты
Добавьте DETERMINISTIC в определение функции, если он создает те же выходные данные для одних и того же входных данных. Это позволяет оптимизировать запросы для повышения производительности.
По умолчанию определяемые пользователем определяемые пользователем UDT каталога Пакетной службы Unity считаются недетерминированными, если явно не объявлены. Примеры недетерминированных функций: создание случайных значений, доступ к текущим времени или датам или вызовы внешних API.
См. статьюCREATE FUNCTION (SQL и Python)
Ограничения
- Функции Python должны обрабатывать
NULLзначения независимо, и все сопоставления типов должны соответствовать сопоставлениям языка SQL Azure Databricks. - Пакетные UDF Python для каталога Unity выполняются в безопасной, изолированной среде и не имеют доступа к общей файловой системе или внутренним службам.
- Несколько вызовов UDF на стадии сериализуются, а промежуточные результаты материализуются и могут записываться на диск.
- Учетные данные службы доступны только в пользовательских файлах каталога Batch Unity и Скалярных определяемых пользователем Python. Они не поддерживаются в стандартных UDFS каталога Unity.
- В выделенных кластерах и для временных функций вызывающий функцию должен иметь
ACCESSразрешения на учетные данные службы. См . раздел "Предоставление разрешений на использование учетных данных службы для доступа к внешней облачной службе". - Включите функцию общедоступной предварительной версии включить сеть для определяемых пользователем пользователей в бессерверных хранилищах SQL настранице предварительной версии рабочей области, чтобы сделать вызовы UDF каталога Пакетной версии Unity для внешних служб на бессерверных вычислительных ресурсах хранилища SQL.
- Чтобы выполнять вызовы UDF каталога Unity Batch на бессерверной записной книжке или в вычислительной среде заданий, необходимо настроить управление бессерверным исходящим трафиком.