Operadores definidos por el usuario en Lakeflow Designer

Important

Esta característica está en versión preliminar pública.

Lakeflow Designer permite crear operadores definidos por el usuario que aparecen directamente en el lienzo junto con operadores integrados. Úselos para ampliar Lakeflow Designer con su propia lógica de negocios, cálculos o integraciones.

Hay tres tipos de operadores definidos por el usuario:

  • python-run-function: un archivo YAML independiente con código Python integrado almacenado en el área de trabajo. Mejor para las transformaciones de nivel de DataFrame y las integraciones externas. Los permisos se administran en el nivel de archivo del área de trabajo.
  • uc-udf: Encapsula una función escalar de Unity Catalog. Ideal para transformaciones a nivel de columna. El acceso se rige por los permisos del catálogo de Unity.
  • uc-udtf: Encapsula una función de tabla de Unity Catalog. Lo mejor para las transformaciones de nivel de tabla, como la agrupación en clústeres de ML y la agregación. El acceso se rige por los permisos del catálogo de Unity.
Feature python-run-function uc-udf uc-udtf
Ejemplo de caso de uso Transformaciones de DataFrame, integraciones de API, notificaciones por correo electrónico Cálculos de nivel de columna (IMC, tasas de interés) Agrupación por clústeres de aprendizaje automático, agregación a través de las filas
Entrada Marcos de datos Valores únicos Tabla completa, fila por fila
Output Marcos de datos Valor único Tabla (varias filas)
Requiere la función catálogo de Unity No
Gobernanza del acceso Permisos de archivo del área de trabajo Permisos de Unity Catalog (EXECUTE, USE SCHEMA) Permisos de Unity Catalog (EXECUTE, USE SCHEMA)
Idiomas compatibles Únicamente Python SQL o Python en un contenedor de SQL SQL o Python en un contenedor de SQL

¿Cómo funcionan los operadores definidos por el usuario?

Un operador definido por el usuario consta de:

  • Lógica del operador: código que se ejecuta cuando se ejecuta el operador. Puede ser una función de Python en línea run() (para python-run-function) o una función de Unity Catalog (para uc-udf y uc-udtf).
  • Configuración de YAML: indica al Diseñador de Lakeflow cómo presentar el operador en la interfaz de usuario, incluido el nombre del operador, la descripción, los parámetros de entrada, los widgets de interfaz de usuario y los puertos. Todos los tipos de operador usan el user-defined-operator-v0.1.0 esquema.
  • Archivo de registro: una entrada en .user_defined_operators.yaml que permite que Lakeflow Designer detecte el operador.

Lógica del operador

Lógica del operador definido por el usuario de la función run de Python

Cada python-run-function operador debe definir una run() función:

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config: Valores configurados por el usuario en la interfaz de usuario, indexados por nombre de propiedad.
  • inputs: DataFrames de entrada, indexados por el puerto de entrada name.
  • spark: La SparkSession activa.
  • Devuelve: Un diccionario que asigna los valores del puerto de salida name a DataFrames.

En el ejemplo siguiente se filtran las filas de un dataframe de entrada:

def run(config, inputs, spark):
    df = inputs["in"]
    filtered = df.filter(config["filter_expression"])
    return {"out": filtered}

Si el operador requiere paquetes pip externos, agregue el environment campo a YAML:

environment:
  environment_version: '1'
  dependencies:
    - requests==2.31.0
    - beautifulsoup4==4.12.0

Lógica del operador UDF y UDTF

Puede escribir funciones uc en SQL o Python. Las funciones de Python se envuelven en una instrucción SQL CREATE FUNCTION:

Función SQL:

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
  SELECT weight_kg / (height_m * height_m);

función Python (encapsulada en SQL):

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
  return weight_kg / (height_m ** 2)
$$;

Las UDF procesan un único valor a la vez y devuelven un valor calculado. Las UDTF procesan tablas fila por fila y pueden mantener un estado a lo largo de todas las filas. Se usa uc-udf para transformaciones de nivel de columna y uc-udtf para operaciones como la agrupación en clústeres de ML o la agregación.

Además, las UDTF requieren que defina tres métodos clave: __init__(), eval() y terminate():

class MyOperator:
    def __init__(self):
        # Called before processing - initialize any values needed.

    def eval(self, row, id_column, columns, k):
        # Called once per input row - accumulate data here.

    def terminate(self):
        # Called after all rows - perform final calculations and yield results.

Note

Las tablas de retorno UDTF deben tener tipos fijos y explícitos. No se puede hacer referencia a los tipos de columna de entrada en la configuración de retorno.

Configuración de YAML

La configuración de YAML indica al Diseñador de Lakeflow cómo presentar el operador en la interfaz de usuario. Define el nombre, la descripción, los parámetros de entrada, los widgets de interfaz de usuario y los puertos del operador. Cada campo de configuración es una propiedad con un tipo, un título e indicaciones opcionales del widget x-ui:

config:
  type: object
  properties:
    my_param:
      type: string
      title: My Parameter
      x-ui:
        widget: input
    my_expression:
      type: string
      title: Column
      format: expression
      x-ui:
        widget: expression
        port: in
    my_number:
      type: number
      title: Count
      default: 10
      minimum: 0
      maximum: 100
  required:
    - my_param
    - my_expression

Para obtener información completa sobre el esquema YAML, incluidos todos los tipos de widget y las opciones de configuración, consulte Referencia de YAML del operador definido por el usuario.

Puertos

Los puertos definen las entradas y salidas del operador:

ports:
  input:
    - name: in
      title: Input Data
      mime: application/vnd.databricks.dataframe
      required: true
      allowMultiple: false
  output:
    - name: out
      title: Output Data

YAML para operadores de la función run de Python

Para operadores de python-run-function, el archivo YAML es independiente e incluye un campo run_function con código Python integrado:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
  type: object
  properties:
    filter_expression:
      type: string
      title: Filter Expression
      x-ui:
        widget: input
  required:
    - filter_expression
ports:
  input:
    - name: in
      title: Input
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    def run(config, inputs, spark):
        df = inputs["in"]
        filtered = df.filter(config["filter_expression"])
        return {"out": filtered}

Funciones de catálogo de YAML para Unity

En el caso de los operadores basados en UC, inserte la configuración de YAML como comentario o docstring en la función.

En SQL (use el comentario /* ... */):

RETURN(/*
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Calculate BMI
  id: calculate_bmi
  version: "1.0.0"
  description: Calculates BMI from weight and height.
  config:
    type: object
    properties:
      weight_kg:
        type: string
        title: Weight (in kg)
        format: expression
        x-ui:
          widget: expression
          port: in
      height_m:
        type: string
        title: Height (in meters)
        format: expression
        x-ui:
          widget: expression
          port: in
    required:
      - weight_kg
      - height_m
  ports:
    input:
      - name: in
        title: Input Data
    output:
      - name: out
        title: Output
    */
  SELECT weight_kg / (height_m * height_m)
);

In Python (use """ ... """ docstring):

AS $$
  """
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Calculate BMI
  id: calculate_bmi
  version: "1.0.0"
  description: Calculates BMI from weight and height.
  config:
    type: object
    properties:
      weight_kg:
        type: string
        title: Weight (in kg)
        format: expression
        x-ui:
          widget: expression
          port: in
      height_m:
        type: string
        title: Height (in meters)
        format: expression
        x-ui:
          widget: expression
          port: in
    required:
      - weight_kg
      - height_m
  ports:
    input:
      - name: in
        title: Input Data
    output:
      - name: out
        title: Output
  """

  return weight_kg / (height_m ** 2)
$$;

Registro e implementación del operador en Lakeflow Designer

Para que el operador aparezca en Lakeflow Designer, regístrelo en un .user_defined_operators.yaml archivo:

  • Nivel de área de trabajo: Coloque el archivo en la raíz del área de trabajo para que el operador sea visible para todos los usuarios.
  • Nivel de usuario: Coloque el archivo en la carpeta principal del usuario (/Workspace/Users/<user-name>/.user_defined_operators.yaml) para que los operadores sean visibles solo para usted.

La operators: sección admite rutas de acceso de archivo, referencias a funciones de Catálogo de Unity y patrones globales. Puede mezclar tipos de entrada:

operators:
  # File path (python-run-function operators)
  - /Workspace/Users/me/udos/my_operator.yaml
  # Glob pattern (registers all matching files)
  - /Workspace/Users/me/udos/transforms/*.yaml
  # UC function reference (uc-udf and uc-udtf operators)
  - catalog: my_catalog
    schema: my_schema
    functionName: my_function

Configuraciones avanzadas

Modo de vista previa

Lakeflow Designer admite vistas previas mientras están en modo de diseño. Para los operadores que realizan llamadas a API externas o escriben en sistemas externos, añade una propiedad de configuración is_preview para poder omitir los efectos secundarios durante la vista previa. Cuando el modo de vista previa está habilitado, los usuarios deben hacer clic explícitamente en Ejecutar para ejecutar el operador con efectos secundarios.

config:
  type: object
  properties:
    is_preview:
      type: boolean
      format: is_preview
      default: false

Lakeflow Designer establece automáticamente este valor a true durante la vista previa. Compruébalo en la lógica para omitir los efectos secundarios:

# In a python-run-function
if config.get("is_preview"):
    return {"out": inputs["in"]}

# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END

Conexiones del catálogo de Unity

En el caso de los operadores SQL basados en UC que llaman a API externas, use conexiones HTTP del catálogo de Unity para almacenar credenciales de forma segura:

CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
  host 'https://api.example.com',
  port '443',
  base_path '/v1/',
  bearer_token 'your-token-here'
);

A continuación, utilice la conexión en su UDF de SQL con la función http_request(). Para obtener más información, consulte Conexión a servicios HTTP externos.

WorkspaceClient

Para los operadores de python-run-function, pueden usar WorkspaceClient de Azure Databricks para acceder a los recursos del área de trabajo y a las API externas:

def run(config, inputs, spark):
    from databricks.sdk import WorkspaceClient
    w = WorkspaceClient()
    # Use w to access workspace resources

Creación de un operador completo definido por el usuario de python-run-function

Los pasos siguientes le guían por la creación de un python-run-function operador desde cero.

Paso 1: Definir la lógica

Escribe la función run() en un cuaderno:

from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
    from pyspark.sql import functions as F
    df = inputs["in"]
    result = df.withColumn(config["column_name"], F.current_timestamp())
    return {"out": result}

Paso 2: Probar la función

Pruebe la función de forma interactiva con datos de ejemplo:

test_df = spark.createDataFrame(
    [("Alice", 100), ("Bob", 200)],
    ["name", "amount"]
)

result = run(
    config={"column_name": "processed_at"},
    inputs={"in": test_df},
    spark=spark
)

result["out"].show()

Paso 3: Creación de la configuración de YAML

Defina los metadatos del operador, los campos de configuración y los puertos en un archivo YAML:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
  type: object
  properties:
    column_name:
      type: string
      title: Column Name
      default: processed_at
      x-ui:
        widget: input
  required:
    - column_name

Paso 4: Combinar la lógica y YAML

Agregue los run_function campos y ports para crear el archivo YAML completo. Guárdelo en el área de trabajo, por ejemplo /Workspace/Users/<user-name>/udos/add_timestamp.yaml:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
  type: object
  properties:
    column_name:
      type: string
      title: Column Name
      default: processed_at
      x-ui:
        widget: input
  required:
    - column_name
ports:
  input:
    - name: in
      title: Input
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    from typing import Dict, Any

    def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
        from pyspark.sql import functions as F
        df = inputs["in"]
        result = df.withColumn(config["column_name"], F.current_timestamp())
        return {"out": result}

Paso 5: Registrar el operador

Añada la ruta del archivo a su archivo .user_defined_operators.yaml:

operators:
  - /Workspace/Users/<user-name>/udos/add_timestamp.yaml

Paso 6: Uso del operador en Lakeflow Designer

Abra Lakeflow Designer y compruebe que el operador aparece en la paleta de operadores. Arrástrelo al lienzo, conecte una entrada, configure el nombre de la columna y ejecute una vista previa.

Creación de un operador completo definido por el usuario de UC

En los pasos siguientes se explica cómo crear un operador basado en uc-udf UC.

Paso 1: Definir la lógica

Escriba y pruebe la lógica de función en un cuaderno:

def double_value(input_value: float) -> float:
    if input_value is None:
        return None
    return input_value * 2

Paso 2: Creación de la configuración de YAML

Defina los metadatos del operador, los campos de configuración y los puertos:

schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
  type: object
  properties:
    input_value:
      type: string
      title: Input Value
      format: expression
      x-ui:
        widget: expression
        port: input_data
  required:
    - input_value
ports:
  input:
    - name: input_data
      title: Input
  output:
    - name: out
      title: Output

Paso 3: Combinar la lógica y YAML

Cree la función de Unity Catalog con el YAML incrustado como una cadena de documentación:

CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
  """
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Double Value
  id: math.double_value
  version: "1.0.0"
  description: Doubles the input value
  config:
    type: object
    properties:
      input_value:
        type: string
        title: Input Value
        format: expression
        x-ui:
          widget: expression
          port: input_data
    required:
      - input_value
  ports:
    input:
      - name: input_data
        title: Input
    output:
      - name: out
        title: Output
  """

  def double_value(input_value: float) -> float:
      if input_value is None:
          return None
      return input_value * 2

  return double_value(input_value)
$$

Paso 4: Probar la función

SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10

Paso 5: Registrar el operador

Agregue la referencia de la función catálogo de Unity al .user_defined_operators.yaml archivo:

operators:
  - catalog: main
    schema: my_schema
    functionName: double_value

Paso 6: Uso del operador en Lakeflow Designer

Abra Lakeflow Designer y compruebe que el operador aparece en la paleta de operadores. Arrástrelo al lienzo, conecte una entrada y ejecute una vista previa.

Troubleshooting

Issue Solución
El operador no aparece en el Diseñador de Lakeflow. Compruebe que .user_defined_operators.yaml existe e incluye su función o la ruta de acceso del archivo. Para los operadores python-run-function, verifique la ruta del archivo y que el archivo YAML sea accesible.
Se produce un error en la validación del esquema. Compruebe el CÓDIGO YAML con el esquema oficial en https://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.json.
permiso denegado. En el caso de los operadores basados en UC, compruebe que los usuarios tienen EXECUTE en la función y USE SCHEMA en el esquema. Para los operadores python-run-function, compruebe que los usuarios tienen acceso de lectura al archivo YAML.
python-run-function el operador produce un error en tiempo de ejecución. Compruebe que la firma de la run() función coincide con def run(config, inputs, spark). Compruebe que los nombres de puerto del código coincidan con YAML y que las claves de diccionario de retorno coincidan con los valores del puerto name de salida.
UDTF devuelve tipos incorrectos. Los tipos de retorno de las UDTF deben declararse explícitamente: no se puede hacer referencia a los tipos de las columnas de entrada.

Permissions

Permiso propósito
Acceso de lectura a .user_defined_operators.yaml. Descubra el operador .
Acceso de lectura al archivo YAML (python-run-function solo). Cargue la definición del operador.
EXECUTE en la función catálogo de Unity (solo operadores basados en UC). Ejecute el operador .
USE SCHEMA en el esquema (solo operadores basados en UC). Acceda al esquema donde se crea la función.
Otros permisos Según el operador, los usuarios pueden requerir otros permisos. Por ejemplo, USE CONNECTION en una conexión de Unity Catalog para llamadas a la API HTTP.

Pasos siguientes

Explore los siguientes tutoriales:

Example Tipo Description
Remitente de correo electrónico de Gmail python-run-function Enviar los datos de un DataFrame como archivo adjunto CSV en un correo electrónico a través de Gmail.
Calculadora de interés compuesto uc-udf Calcule los valores de inversión futuros mediante la fórmula de interés compuesto.
Agrupación en clústeres K-Means uc-udtf Segmente los datos en clústeres mediante scikit-learn.
Enviar mensaje de Slack uc-udf Enviar notificaciones a canales de Slack a través de la API.
Todos los widgets de interfaz de usuario uc-udf Operador de referencia que muestra todos los widgets de interfaz de usuario disponibles.

Para obtener una referencia completa al esquema YAML, consulte Referencia de YAML del operador definido por el usuario.