Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Important
Эта функция доступна в общедоступной предварительной версии.
Конструктор Lakeflow позволяет создавать определяемые пользователем операторы , которые отображаются непосредственно на холсте вместе со встроенными операторами. Используйте их для расширения Конструктора Lakeflow с помощью собственной бизнес-логики, вычислений или интеграции.
Существует три типа определяемых пользователем операторов:
-
python-run-function: автономный файл YAML со встроенным кодом Python, хранящийся в рабочей области. Лучше всего подходит для преобразований на уровне DataFrame и внешних интеграций. Разрешения управляются на уровне файла рабочей области. -
uc-udf: Является оболочкой для скалярной функции Unity Catalog. Лучше всего подходит для преобразований на уровне столбцов. Доступ регулируется разрешениями каталога Unity. -
uc-udtf: служит оболочкой для табличной функции Unity Catalog. Лучше всего подходит для преобразований на уровне таблицы, таких как кластеризация машинного обучения и агрегирование. Доступ регулируется разрешениями каталога Unity.
| Функция | python-run-function |
uc-udf |
uc-udtf |
|---|---|---|---|
| Пример варианта использования | Преобразования DataFrame, интеграции API, уведомления по электронной почте | Вычисления на уровне столбцов (BMI, процентные ставки) | Кластеризация ML, агрегирование по строкам |
| Input | таблицы данных | Отдельные значения | Вся таблица, строка по строке |
| Выходные данные | таблицы данных | Одно значение | Таблица (несколько строк) |
| Требуется функция каталога Unity | Нет | Да | Да |
| Управление доступом | Разрешения для файлов рабочей области | Разрешения каталога Unity (EXECUTE, USE SCHEMA) |
Разрешения каталога Unity (EXECUTE, USE SCHEMA) |
| Поддерживаемые языки | Только Python | SQL или Python в оболочке SQL | SQL или Python в оболочке SQL |
Как работают определяемые пользователем операторы?
Определяемый пользователем оператор состоит из следующих элементов:
-
Логика оператора: код, который выполняется при выполнении оператора. Это может быть встроенная функция Python
run()(дляpython-run-function) или функция каталога Unity (дляuc-udfиuc-udtf). -
Конфигурация YAML: сообщает конструктору Lakeflow, как представить оператор в пользовательском интерфейсе, включая имя оператора, описание, входные параметры, мини-приложения пользовательского интерфейса и порты. Все типы операторов используют схему
user-defined-operator-v0.1.0. -
Файл регистрации: запись в
.user_defined_operators.yaml, которая позволяет Lakeflow Designer обнаружить оператор.
Логика оператора
Функция выполнения пользовательской логики оператора Python
Каждый python-run-function оператор должен определить функцию run() :
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
-
config: значения, заданные пользователем в пользовательском интерфейсе, с ключами в виде имён свойств. -
inputs: входные DataFrames, где ключом является входной портname. -
spark: Активная SparkSession. -
Возвращает: словарь, сопоставляющий значения выходных портов
nameобъектам DataFrame.
В следующем примере фильтруются строки из входного DataFrame:
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
Если для оператора требуются внешние пакеты pip, добавьте environment поле в YAML:
environment:
environment_version: '1'
dependencies:
- requests==2.31.0
- beautifulsoup4==4.12.0
Логика оператора UDF и UDTF
Функции UC можно записывать в SQL или Python. функции Python упаковываются в инструкцию SQL CREATE FUNCTION:
Функция SQL:
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
SELECT weight_kg / (height_m * height_m);
функция Python (упакована в SQL):
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
return weight_kg / (height_m ** 2)
$$;
Пользовательские функции обрабатывают по одному значению за раз и возвращают вычисленное значение. UDTF-функции обрабатывают таблицы построчно и могут сохранять состояние между всеми строками. Используется uc-udf для преобразований на уровне столбцов и uc-udtf для операций, таких как кластеризация машинного обучения или агрегирование.
Кроме того, UDTF требуют определить три ключевых метода: __init__(), eval() и terminate():
class MyOperator:
def __init__(self):
# Called before processing - initialize any values needed.
def eval(self, row, id_column, columns, k):
# Called once per input row - accumulate data here.
def terminate(self):
# Called after all rows - perform final calculations and yield results.
Note
Возвращаемые таблицы UDTF должны иметь фиксированные, явные типы. В конфигурации возврата нельзя ссылаться на типы входных столбцов.
Конфигурация YAML
Конфигурация YAML сообщает конструктору Lakeflow, как представить оператор в пользовательском интерфейсе. Он определяет имя оператора, описание, входные параметры, мини-приложения пользовательского интерфейса и порты. Каждое поле конфигурации — это свойство с указанием типа, заголовка и необязательных x-ui подсказок мини-приложения:
config:
type: object
properties:
my_param:
type: string
title: My Parameter
x-ui:
widget: input
my_expression:
type: string
title: Column
format: expression
x-ui:
widget: expression
port: in
my_number:
type: number
title: Count
default: 10
minimum: 0
maximum: 100
required:
- my_param
- my_expression
Полные сведения о схеме YAML, включая все типы мини-приложений и параметры конфигурации, см. в справочнике по определяемым пользователем оператору YAML.
Порты
Порты определяют входные и выходные данные для оператора:
ports:
input:
- name: in
title: Input Data
mime: application/vnd.databricks.dataframe
required: true
allowMultiple: false
output:
- name: out
title: Output Data
YAML для операторов функции выполнения Python
Для операторов python-run-function файл YAML является автономным и включает поле run_function с встроенным кодом Python:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
type: object
properties:
filter_expression:
type: string
title: Filter Expression
x-ui:
widget: input
required:
- filter_expression
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
YAML для функций Unity Catalog
Для операторов на основе UC встраивайте конфигурацию YAML в свою функцию в виде комментария или строки документации.
В SQL (используйте комментарий /* ... */):
RETURN(/*
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
*/
SELECT weight_kg / (height_m * height_m)
);
In Python (используйте """ ... """ docstring):
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
"""
return weight_kg / (height_m ** 2)
$$;
Регистрация и развертывание оператора в Конструкторе Lakeflow
Чтобы ваш оператор отображался в Конструкторе Lakeflow, зарегистрируйте его в файле .user_defined_operators.yaml:
- Уровень рабочей области: Поместите файл в корень рабочей области, чтобы сделать оператор видимым для всех пользователей.
-
Уровень пользователя: Поместите файл в домашнюю папку пользователя (
/Workspace/Users/<user-name>/.user_defined_operators.yaml), чтобы операторы были видимы только для вас.
Раздел operators: поддерживает пути к файлам, ссылки на функции Unity Catalog и glob-шаблоны. Вы можете смешивать типы записей:
operators:
# File path (python-run-function operators)
- /Workspace/Users/me/udos/my_operator.yaml
# Glob pattern (registers all matching files)
- /Workspace/Users/me/udos/transforms/*.yaml
# UC function reference (uc-udf and uc-udtf operators)
- catalog: my_catalog
schema: my_schema
functionName: my_function
Дополнительные конфигурации
Режим предварительного просмотра
Конструктор Lakeflow поддерживает предварительные версии в режиме конструктора. Для операторов, которые вызывают внешние API или записывают данные во внешние системы, добавьте свойство конфигурации is_preview, чтобы пропускать побочные эффекты во время предварительного просмотра. Если режим предварительной версии включен, пользователям необходимо явно нажать кнопку "Выполнить ", чтобы выполнить оператор с побочными эффектами.
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
Конструктор Lakeflow автоматически задает это значение true во время предварительной версии. Проверьте это в логике, чтобы избежать побочных эффектов:
# In a python-run-function
if config.get("is_preview"):
return {"out": inputs["in"]}
# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END
Подключения каталога Unity
Для операторов SQL на основе UC, вызывающих внешние API, используйте HTTP-подключения каталога Unity для безопасного хранения учетных данных:
CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
host 'https://api.example.com',
port '443',
base_path '/v1/',
bearer_token 'your-token-here'
);
Затем используйте подключение в UDF SQL с функцией http_request() . Дополнительные сведения см. в разделе "Подключение к внешним службам HTTP".
WorkspaceClient
Для операторов python-run-function можно использовать Azure Databricks WorkspaceClient для доступа к ресурсам рабочей области и внешним API:
def run(config, inputs, spark):
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Use w to access workspace resources
Создайте полнофункциональный пользовательский оператор python-run-function
Ниже описано, как создать python-run-function оператор с нуля.
Шаг 1. Определение логики
Напишите свою функцию run() в блокноте:
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
Шаг 2. Проверка функции
Протестируйте функцию в интерактивном режиме с помощью примеров данных:
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)
result = run(
config={"column_name": "processed_at"},
inputs={"in": test_df},
spark=spark
)
result["out"].show()
Шаг 3. Создание конфигурации YAML
Определите метаданные оператора, поля конфигурации и порты в ФАЙЛЕ YAML:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
Шаг 4. Объединение логики и YAML
Добавьте поля run_function и ports, чтобы создать полный YAML-файл. Сохраните его в рабочей области, например /Workspace/Users/<user-name>/udos/add_timestamp.yaml:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
Шаг 5. Регистрация оператора
Добавьте путь к .user_defined_operators.yaml файлу:
operators:
- /Workspace/Users/<user-name>/udos/add_timestamp.yaml
Шаг 6. Использование оператора в конструкторе Lakeflow
Откройте конструктор Lakeflow и убедитесь, что оператор отображается в палитре операторов. Перетащите его на холст, подключите входные данные, настройте имя столбца и запустите предварительную версию.
Создайте полностью определяемый пользователем оператор UC
В следующих шагах описывается создание оператора на основе UC uc-udf.
Шаг 1. Определение логики
Напишите и проверьте логику функции в записной книжке:
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
Шаг 2. Создание конфигурации YAML
Определите метаданные оператора, поля конфигурации и порты:
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
Шаг 3. Объединение логики и YAML
Создайте функцию Unity Catalog со встроенным YAML в виде строки документации:
CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: "1.0.0"
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
"""
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
return double_value(input_value)
$$
Шаг 4. Проверка функции
SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10
Шаг 5. Регистрация оператора
Добавьте ссылку на функцию каталога Unity в .user_defined_operators.yaml файл:
operators:
- catalog: main
schema: my_schema
functionName: double_value
Шаг 6. Использование оператора в конструкторе Lakeflow
Откройте конструктор Lakeflow и убедитесь, что оператор отображается в палитре операторов. Перетащите его на холст, подключите входные данные и запустите предварительный просмотр.
Troubleshooting
| Issue | Решение |
|---|---|
| Оператор не отображается в конструкторе Lakeflow. | Убедитесь, что .user_defined_operators.yaml существует и содержит вашу функцию или путь к файлу. Для python-run-function операторов проверьте путь к файлу и доступ к файлу YAML. |
| Не удалось выполнить проверку схемы. | Проверьте ваш YAML на соответствие официальной схеме по адресу https://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.json. |
| в разрешении отказано. | Для операторов на базе UC убедитесь, что у пользователей есть EXECUTE для функции и USE SCHEMA для схемы. Для python-run-function операторов убедитесь, что у пользователей есть доступ на чтение к файлу YAML. |
python-run-function оператор не работает во время выполнения. |
Убедитесь, что сигнатура run() функции соответствует def run(config, inputs, spark). Убедитесь, что имена портов в коде соответствуют YAML и что ключи словаря возвращают значения выходных портов name . |
| UDTF возвращает неправильные типы. | Типы возвращаемых значений UDTF должны быть явными— нельзя ссылаться на типы входных столбцов. |
Permissions
| Разрешение | Purpose |
|---|---|
Доступ к.user_defined_operators.yaml для чтения. |
Найдите оператора. |
Доступ на чтение к файлу YAML (python-run-function только). |
Загрузите определение оператора. |
| EXECUTE для функции Unity Catalog (только для операторов на основе UC). | Запустите оператор. |
| USE SCHEMA в схеме (только операторы на основе UC). | Доступ к схеме, в которой создается функция. |
| Другие разрешения | В зависимости от оператора пользователи могут требовать другие разрешения. Например, USE CONNECTION в подключении Unity Catalog для вызовов HTTP API. |
Дальнейшие действия
Ознакомьтесь со следующими руководствами.
| Example | Тип | Description |
|---|---|---|
| Отправитель электронной почты Gmail | python-run-function |
Отправьте данные DataFrame в виде CSV-вложения к электронному письму через Gmail. |
| Калькулятор составных процентов | uc-udf |
Вычислите будущие инвестиционные значения с помощью составной формулы интереса. |
| Кластеризация методом k-средних | uc-udtf |
Сегментируйте данные в кластеры с помощью scikit-learn. |
| Отправка сообщения Slack | uc-udf |
Отправка уведомлений в каналы Slack через API. |
| Все мини-приложения пользовательского интерфейса | uc-udf |
Оператор ссылок показывает все доступные мини-приложения пользовательского интерфейса. |
Полный справочник по схеме YAML см. в справочнике по YAML для определяемого пользователем оператора.