Определяемые пользователем операторы в Конструкторе Lakeflow

Important

Эта функция доступна в общедоступной предварительной версии.

Конструктор Lakeflow позволяет создавать определяемые пользователем операторы , которые отображаются непосредственно на холсте вместе со встроенными операторами. Используйте их для расширения Конструктора Lakeflow с помощью собственной бизнес-логики, вычислений или интеграции.

Существует три типа определяемых пользователем операторов:

  • python-run-function: автономный файл YAML со встроенным кодом Python, хранящийся в рабочей области. Лучше всего подходит для преобразований на уровне DataFrame и внешних интеграций. Разрешения управляются на уровне файла рабочей области.
  • uc-udf: Является оболочкой для скалярной функции Unity Catalog. Лучше всего подходит для преобразований на уровне столбцов. Доступ регулируется разрешениями каталога Unity.
  • uc-udtf: служит оболочкой для табличной функции Unity Catalog. Лучше всего подходит для преобразований на уровне таблицы, таких как кластеризация машинного обучения и агрегирование. Доступ регулируется разрешениями каталога Unity.
Функция python-run-function uc-udf uc-udtf
Пример варианта использования Преобразования DataFrame, интеграции API, уведомления по электронной почте Вычисления на уровне столбцов (BMI, процентные ставки) Кластеризация ML, агрегирование по строкам
Input таблицы данных Отдельные значения Вся таблица, строка по строке
Выходные данные таблицы данных Одно значение Таблица (несколько строк)
Требуется функция каталога Unity Нет Да Да
Управление доступом Разрешения для файлов рабочей области Разрешения каталога Unity (EXECUTE, USE SCHEMA) Разрешения каталога Unity (EXECUTE, USE SCHEMA)
Поддерживаемые языки Только Python SQL или Python в оболочке SQL SQL или Python в оболочке SQL

Как работают определяемые пользователем операторы?

Определяемый пользователем оператор состоит из следующих элементов:

  • Логика оператора: код, который выполняется при выполнении оператора. Это может быть встроенная функция Python run() (для python-run-function) или функция каталога Unity (для uc-udf и uc-udtf).
  • Конфигурация YAML: сообщает конструктору Lakeflow, как представить оператор в пользовательском интерфейсе, включая имя оператора, описание, входные параметры, мини-приложения пользовательского интерфейса и порты. Все типы операторов используют схему user-defined-operator-v0.1.0 .
  • Файл регистрации: запись в .user_defined_operators.yaml, которая позволяет Lakeflow Designer обнаружить оператор.

Логика оператора

Функция выполнения пользовательской логики оператора Python

Каждый python-run-function оператор должен определить функцию run() :

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config: значения, заданные пользователем в пользовательском интерфейсе, с ключами в виде имён свойств.
  • inputs: входные DataFrames, где ключом является входной порт name.
  • spark: Активная SparkSession.
  • Возвращает: словарь, сопоставляющий значения выходных портов name объектам DataFrame.

В следующем примере фильтруются строки из входного DataFrame:

def run(config, inputs, spark):
    df = inputs["in"]
    filtered = df.filter(config["filter_expression"])
    return {"out": filtered}

Если для оператора требуются внешние пакеты pip, добавьте environment поле в YAML:

environment:
  environment_version: '1'
  dependencies:
    - requests==2.31.0
    - beautifulsoup4==4.12.0

Логика оператора UDF и UDTF

Функции UC можно записывать в SQL или Python. функции Python упаковываются в инструкцию SQL CREATE FUNCTION:

Функция SQL:

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
  SELECT weight_kg / (height_m * height_m);

функция Python (упакована в SQL):

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
  return weight_kg / (height_m ** 2)
$$;

Пользовательские функции обрабатывают по одному значению за раз и возвращают вычисленное значение. UDTF-функции обрабатывают таблицы построчно и могут сохранять состояние между всеми строками. Используется uc-udf для преобразований на уровне столбцов и uc-udtf для операций, таких как кластеризация машинного обучения или агрегирование.

Кроме того, UDTF требуют определить три ключевых метода: __init__(), eval() и terminate():

class MyOperator:
    def __init__(self):
        # Called before processing - initialize any values needed.

    def eval(self, row, id_column, columns, k):
        # Called once per input row - accumulate data here.

    def terminate(self):
        # Called after all rows - perform final calculations and yield results.

Note

Возвращаемые таблицы UDTF должны иметь фиксированные, явные типы. В конфигурации возврата нельзя ссылаться на типы входных столбцов.

Конфигурация YAML

Конфигурация YAML сообщает конструктору Lakeflow, как представить оператор в пользовательском интерфейсе. Он определяет имя оператора, описание, входные параметры, мини-приложения пользовательского интерфейса и порты. Каждое поле конфигурации — это свойство с указанием типа, заголовка и необязательных x-ui подсказок мини-приложения:

config:
  type: object
  properties:
    my_param:
      type: string
      title: My Parameter
      x-ui:
        widget: input
    my_expression:
      type: string
      title: Column
      format: expression
      x-ui:
        widget: expression
        port: in
    my_number:
      type: number
      title: Count
      default: 10
      minimum: 0
      maximum: 100
  required:
    - my_param
    - my_expression

Полные сведения о схеме YAML, включая все типы мини-приложений и параметры конфигурации, см. в справочнике по определяемым пользователем оператору YAML.

Порты

Порты определяют входные и выходные данные для оператора:

ports:
  input:
    - name: in
      title: Input Data
      mime: application/vnd.databricks.dataframe
      required: true
      allowMultiple: false
  output:
    - name: out
      title: Output Data

YAML для операторов функции выполнения Python

Для операторов python-run-function файл YAML является автономным и включает поле run_function с встроенным кодом Python:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
  type: object
  properties:
    filter_expression:
      type: string
      title: Filter Expression
      x-ui:
        widget: input
  required:
    - filter_expression
ports:
  input:
    - name: in
      title: Input
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    def run(config, inputs, spark):
        df = inputs["in"]
        filtered = df.filter(config["filter_expression"])
        return {"out": filtered}

YAML для функций Unity Catalog

Для операторов на основе UC встраивайте конфигурацию YAML в свою функцию в виде комментария или строки документации.

В SQL (используйте комментарий /* ... */):

RETURN(/*
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Calculate BMI
  id: calculate_bmi
  version: "1.0.0"
  description: Calculates BMI from weight and height.
  config:
    type: object
    properties:
      weight_kg:
        type: string
        title: Weight (in kg)
        format: expression
        x-ui:
          widget: expression
          port: in
      height_m:
        type: string
        title: Height (in meters)
        format: expression
        x-ui:
          widget: expression
          port: in
    required:
      - weight_kg
      - height_m
  ports:
    input:
      - name: in
        title: Input Data
    output:
      - name: out
        title: Output
    */
  SELECT weight_kg / (height_m * height_m)
);

In Python (используйте """ ... """ docstring):

AS $$
  """
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Calculate BMI
  id: calculate_bmi
  version: "1.0.0"
  description: Calculates BMI from weight and height.
  config:
    type: object
    properties:
      weight_kg:
        type: string
        title: Weight (in kg)
        format: expression
        x-ui:
          widget: expression
          port: in
      height_m:
        type: string
        title: Height (in meters)
        format: expression
        x-ui:
          widget: expression
          port: in
    required:
      - weight_kg
      - height_m
  ports:
    input:
      - name: in
        title: Input Data
    output:
      - name: out
        title: Output
  """

  return weight_kg / (height_m ** 2)
$$;

Регистрация и развертывание оператора в Конструкторе Lakeflow

Чтобы ваш оператор отображался в Конструкторе Lakeflow, зарегистрируйте его в файле .user_defined_operators.yaml:

  • Уровень рабочей области: Поместите файл в корень рабочей области, чтобы сделать оператор видимым для всех пользователей.
  • Уровень пользователя: Поместите файл в домашнюю папку пользователя (/Workspace/Users/<user-name>/.user_defined_operators.yaml), чтобы операторы были видимы только для вас.

Раздел operators: поддерживает пути к файлам, ссылки на функции Unity Catalog и glob-шаблоны. Вы можете смешивать типы записей:

operators:
  # File path (python-run-function operators)
  - /Workspace/Users/me/udos/my_operator.yaml
  # Glob pattern (registers all matching files)
  - /Workspace/Users/me/udos/transforms/*.yaml
  # UC function reference (uc-udf and uc-udtf operators)
  - catalog: my_catalog
    schema: my_schema
    functionName: my_function

Дополнительные конфигурации

Режим предварительного просмотра

Конструктор Lakeflow поддерживает предварительные версии в режиме конструктора. Для операторов, которые вызывают внешние API или записывают данные во внешние системы, добавьте свойство конфигурации is_preview, чтобы пропускать побочные эффекты во время предварительного просмотра. Если режим предварительной версии включен, пользователям необходимо явно нажать кнопку "Выполнить ", чтобы выполнить оператор с побочными эффектами.

config:
  type: object
  properties:
    is_preview:
      type: boolean
      format: is_preview
      default: false

Конструктор Lakeflow автоматически задает это значение true во время предварительной версии. Проверьте это в логике, чтобы избежать побочных эффектов:

# In a python-run-function
if config.get("is_preview"):
    return {"out": inputs["in"]}

# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END

Подключения каталога Unity

Для операторов SQL на основе UC, вызывающих внешние API, используйте HTTP-подключения каталога Unity для безопасного хранения учетных данных:

CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
  host 'https://api.example.com',
  port '443',
  base_path '/v1/',
  bearer_token 'your-token-here'
);

Затем используйте подключение в UDF SQL с функцией http_request() . Дополнительные сведения см. в разделе "Подключение к внешним службам HTTP".

WorkspaceClient

Для операторов python-run-function можно использовать Azure Databricks WorkspaceClient для доступа к ресурсам рабочей области и внешним API:

def run(config, inputs, spark):
    from databricks.sdk import WorkspaceClient
    w = WorkspaceClient()
    # Use w to access workspace resources

Создайте полнофункциональный пользовательский оператор python-run-function

Ниже описано, как создать python-run-function оператор с нуля.

Шаг 1. Определение логики

Напишите свою функцию run() в блокноте:

from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
    from pyspark.sql import functions as F
    df = inputs["in"]
    result = df.withColumn(config["column_name"], F.current_timestamp())
    return {"out": result}

Шаг 2. Проверка функции

Протестируйте функцию в интерактивном режиме с помощью примеров данных:

test_df = spark.createDataFrame(
    [("Alice", 100), ("Bob", 200)],
    ["name", "amount"]
)

result = run(
    config={"column_name": "processed_at"},
    inputs={"in": test_df},
    spark=spark
)

result["out"].show()

Шаг 3. Создание конфигурации YAML

Определите метаданные оператора, поля конфигурации и порты в ФАЙЛЕ YAML:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
  type: object
  properties:
    column_name:
      type: string
      title: Column Name
      default: processed_at
      x-ui:
        widget: input
  required:
    - column_name

Шаг 4. Объединение логики и YAML

Добавьте поля run_function и ports, чтобы создать полный YAML-файл. Сохраните его в рабочей области, например /Workspace/Users/<user-name>/udos/add_timestamp.yaml:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
  type: object
  properties:
    column_name:
      type: string
      title: Column Name
      default: processed_at
      x-ui:
        widget: input
  required:
    - column_name
ports:
  input:
    - name: in
      title: Input
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    from typing import Dict, Any

    def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
        from pyspark.sql import functions as F
        df = inputs["in"]
        result = df.withColumn(config["column_name"], F.current_timestamp())
        return {"out": result}

Шаг 5. Регистрация оператора

Добавьте путь к .user_defined_operators.yaml файлу:

operators:
  - /Workspace/Users/<user-name>/udos/add_timestamp.yaml

Шаг 6. Использование оператора в конструкторе Lakeflow

Откройте конструктор Lakeflow и убедитесь, что оператор отображается в палитре операторов. Перетащите его на холст, подключите входные данные, настройте имя столбца и запустите предварительную версию.

Создайте полностью определяемый пользователем оператор UC

В следующих шагах описывается создание оператора на основе UC uc-udf.

Шаг 1. Определение логики

Напишите и проверьте логику функции в записной книжке:

def double_value(input_value: float) -> float:
    if input_value is None:
        return None
    return input_value * 2

Шаг 2. Создание конфигурации YAML

Определите метаданные оператора, поля конфигурации и порты:

schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
  type: object
  properties:
    input_value:
      type: string
      title: Input Value
      format: expression
      x-ui:
        widget: expression
        port: input_data
  required:
    - input_value
ports:
  input:
    - name: input_data
      title: Input
  output:
    - name: out
      title: Output

Шаг 3. Объединение логики и YAML

Создайте функцию Unity Catalog со встроенным YAML в виде строки документации:

CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
  """
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Double Value
  id: math.double_value
  version: "1.0.0"
  description: Doubles the input value
  config:
    type: object
    properties:
      input_value:
        type: string
        title: Input Value
        format: expression
        x-ui:
          widget: expression
          port: input_data
    required:
      - input_value
  ports:
    input:
      - name: input_data
        title: Input
    output:
      - name: out
        title: Output
  """

  def double_value(input_value: float) -> float:
      if input_value is None:
          return None
      return input_value * 2

  return double_value(input_value)
$$

Шаг 4. Проверка функции

SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10

Шаг 5. Регистрация оператора

Добавьте ссылку на функцию каталога Unity в .user_defined_operators.yaml файл:

operators:
  - catalog: main
    schema: my_schema
    functionName: double_value

Шаг 6. Использование оператора в конструкторе Lakeflow

Откройте конструктор Lakeflow и убедитесь, что оператор отображается в палитре операторов. Перетащите его на холст, подключите входные данные и запустите предварительный просмотр.

Troubleshooting

Issue Решение
Оператор не отображается в конструкторе Lakeflow. Убедитесь, что .user_defined_operators.yaml существует и содержит вашу функцию или путь к файлу. Для python-run-function операторов проверьте путь к файлу и доступ к файлу YAML.
Не удалось выполнить проверку схемы. Проверьте ваш YAML на соответствие официальной схеме по адресу https://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.json.
в разрешении отказано. Для операторов на базе UC убедитесь, что у пользователей есть EXECUTE для функции и USE SCHEMA для схемы. Для python-run-function операторов убедитесь, что у пользователей есть доступ на чтение к файлу YAML.
python-run-function оператор не работает во время выполнения. Убедитесь, что сигнатура run() функции соответствует def run(config, inputs, spark). Убедитесь, что имена портов в коде соответствуют YAML и что ключи словаря возвращают значения выходных портов name .
UDTF возвращает неправильные типы. Типы возвращаемых значений UDTF должны быть явными— нельзя ссылаться на типы входных столбцов.

Permissions

Разрешение Purpose
Доступ к.user_defined_operators.yaml для чтения. Найдите оператора.
Доступ на чтение к файлу YAML (python-run-function только). Загрузите определение оператора.
EXECUTE для функции Unity Catalog (только для операторов на основе UC). Запустите оператор.
USE SCHEMA в схеме (только операторы на основе UC). Доступ к схеме, в которой создается функция.
Другие разрешения В зависимости от оператора пользователи могут требовать другие разрешения. Например, USE CONNECTION в подключении Unity Catalog для вызовов HTTP API.

Дальнейшие действия

Ознакомьтесь со следующими руководствами.

Example Тип Description
Отправитель электронной почты Gmail python-run-function Отправьте данные DataFrame в виде CSV-вложения к электронному письму через Gmail.
Калькулятор составных процентов uc-udf Вычислите будущие инвестиционные значения с помощью составной формулы интереса.
Кластеризация методом k-средних uc-udtf Сегментируйте данные в кластеры с помощью scikit-learn.
Отправка сообщения Slack uc-udf Отправка уведомлений в каналы Slack через API.
Все мини-приложения пользовательского интерфейса uc-udf Оператор ссылок показывает все доступные мини-приложения пользовательского интерфейса.

Полный справочник по схеме YAML см. в справочнике по YAML для определяемого пользователем оператора.