Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Important
Esta característica está en versión preliminar pública.
Lakeflow Designer permite crear operadores definidos por el usuario que aparecen directamente en el lienzo junto con operadores integrados. Úselos para ampliar Lakeflow Designer con su propia lógica de negocios, cálculos o integraciones.
Hay tres tipos de operadores definidos por el usuario:
-
python-run-function: un archivo YAML independiente con código Python integrado almacenado en el área de trabajo. Mejor para las transformaciones de nivel de DataFrame y las integraciones externas. Los permisos se administran en el nivel de archivo del área de trabajo. -
uc-udf: Encapsula una función escalar de Unity Catalog. Ideal para transformaciones a nivel de columna. El acceso se rige por los permisos del catálogo de Unity. -
uc-udtf: Encapsula una función de tabla de Unity Catalog. Lo mejor para las transformaciones de nivel de tabla, como la agrupación en clústeres de ML y la agregación. El acceso se rige por los permisos del catálogo de Unity.
| Feature | python-run-function |
uc-udf |
uc-udtf |
|---|---|---|---|
| Ejemplo de caso de uso | Transformaciones de DataFrame, integraciones de API, notificaciones por correo electrónico | Cálculos de nivel de columna (IMC, tasas de interés) | Agrupación por clústeres de aprendizaje automático, agregación a través de las filas |
| Entrada | Marcos de datos | Valores únicos | Tabla completa, fila por fila |
| Output | Marcos de datos | Valor único | Tabla (varias filas) |
| Requiere la función catálogo de Unity | No | Sí | Sí |
| Gobernanza del acceso | Permisos de archivo del área de trabajo | Permisos de Unity Catalog (EXECUTE, USE SCHEMA) |
Permisos de Unity Catalog (EXECUTE, USE SCHEMA) |
| Idiomas compatibles | Únicamente Python | SQL o Python en un contenedor de SQL | SQL o Python en un contenedor de SQL |
¿Cómo funcionan los operadores definidos por el usuario?
Un operador definido por el usuario consta de:
-
Lógica del operador: código que se ejecuta cuando se ejecuta el operador. Puede ser una función de Python en línea
run()(parapython-run-function) o una función de Unity Catalog (parauc-udfyuc-udtf). -
Configuración de YAML: indica al Diseñador de Lakeflow cómo presentar el operador en la interfaz de usuario, incluido el nombre del operador, la descripción, los parámetros de entrada, los widgets de interfaz de usuario y los puertos. Todos los tipos de operador usan el
user-defined-operator-v0.1.0esquema. -
Archivo de registro: una entrada en
.user_defined_operators.yamlque permite que Lakeflow Designer detecte el operador.
Lógica del operador
Lógica del operador definido por el usuario de la función run de Python
Cada python-run-function operador debe definir una run() función:
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
-
config: Valores configurados por el usuario en la interfaz de usuario, indexados por nombre de propiedad. -
inputs: DataFrames de entrada, indexados por el puerto de entradaname. -
spark: La SparkSession activa. -
Devuelve: Un diccionario que asigna los valores del puerto de salida
namea DataFrames.
En el ejemplo siguiente se filtran las filas de un dataframe de entrada:
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
Si el operador requiere paquetes pip externos, agregue el environment campo a YAML:
environment:
environment_version: '1'
dependencies:
- requests==2.31.0
- beautifulsoup4==4.12.0
Lógica del operador UDF y UDTF
Puede escribir funciones uc en SQL o Python. Las funciones de Python se envuelven en una instrucción SQL CREATE FUNCTION:
Función SQL:
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
SELECT weight_kg / (height_m * height_m);
función Python (encapsulada en SQL):
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
return weight_kg / (height_m ** 2)
$$;
Las UDF procesan un único valor a la vez y devuelven un valor calculado. Las UDTF procesan tablas fila por fila y pueden mantener un estado a lo largo de todas las filas. Se usa uc-udf para transformaciones de nivel de columna y uc-udtf para operaciones como la agrupación en clústeres de ML o la agregación.
Además, las UDTF requieren que defina tres métodos clave: __init__(), eval() y terminate():
class MyOperator:
def __init__(self):
# Called before processing - initialize any values needed.
def eval(self, row, id_column, columns, k):
# Called once per input row - accumulate data here.
def terminate(self):
# Called after all rows - perform final calculations and yield results.
Note
Las tablas de retorno UDTF deben tener tipos fijos y explícitos. No se puede hacer referencia a los tipos de columna de entrada en la configuración de retorno.
Configuración de YAML
La configuración de YAML indica al Diseñador de Lakeflow cómo presentar el operador en la interfaz de usuario. Define el nombre, la descripción, los parámetros de entrada, los widgets de interfaz de usuario y los puertos del operador. Cada campo de configuración es una propiedad con un tipo, un título e indicaciones opcionales del widget x-ui:
config:
type: object
properties:
my_param:
type: string
title: My Parameter
x-ui:
widget: input
my_expression:
type: string
title: Column
format: expression
x-ui:
widget: expression
port: in
my_number:
type: number
title: Count
default: 10
minimum: 0
maximum: 100
required:
- my_param
- my_expression
Para obtener información completa sobre el esquema YAML, incluidos todos los tipos de widget y las opciones de configuración, consulte Referencia de YAML del operador definido por el usuario.
Puertos
Los puertos definen las entradas y salidas del operador:
ports:
input:
- name: in
title: Input Data
mime: application/vnd.databricks.dataframe
required: true
allowMultiple: false
output:
- name: out
title: Output Data
YAML para operadores de la función run de Python
Para operadores de python-run-function, el archivo YAML es independiente e incluye un campo run_function con código Python integrado:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
type: object
properties:
filter_expression:
type: string
title: Filter Expression
x-ui:
widget: input
required:
- filter_expression
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
Funciones de catálogo de YAML para Unity
En el caso de los operadores basados en UC, inserte la configuración de YAML como comentario o docstring en la función.
En SQL (use el comentario /* ... */):
RETURN(/*
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
*/
SELECT weight_kg / (height_m * height_m)
);
In Python (use """ ... """ docstring):
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
"""
return weight_kg / (height_m ** 2)
$$;
Registro e implementación del operador en Lakeflow Designer
Para que el operador aparezca en Lakeflow Designer, regístrelo en un .user_defined_operators.yaml archivo:
- Nivel de área de trabajo: Coloque el archivo en la raíz del área de trabajo para que el operador sea visible para todos los usuarios.
-
Nivel de usuario: Coloque el archivo en la carpeta principal del usuario (
/Workspace/Users/<user-name>/.user_defined_operators.yaml) para que los operadores sean visibles solo para usted.
La operators: sección admite rutas de acceso de archivo, referencias a funciones de Catálogo de Unity y patrones globales. Puede mezclar tipos de entrada:
operators:
# File path (python-run-function operators)
- /Workspace/Users/me/udos/my_operator.yaml
# Glob pattern (registers all matching files)
- /Workspace/Users/me/udos/transforms/*.yaml
# UC function reference (uc-udf and uc-udtf operators)
- catalog: my_catalog
schema: my_schema
functionName: my_function
Configuraciones avanzadas
Modo de vista previa
Lakeflow Designer admite vistas previas mientras están en modo de diseño. Para los operadores que realizan llamadas a API externas o escriben en sistemas externos, añade una propiedad de configuración is_preview para poder omitir los efectos secundarios durante la vista previa. Cuando el modo de vista previa está habilitado, los usuarios deben hacer clic explícitamente en Ejecutar para ejecutar el operador con efectos secundarios.
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
Lakeflow Designer establece automáticamente este valor a true durante la vista previa. Compruébalo en la lógica para omitir los efectos secundarios:
# In a python-run-function
if config.get("is_preview"):
return {"out": inputs["in"]}
# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END
Conexiones del catálogo de Unity
En el caso de los operadores SQL basados en UC que llaman a API externas, use conexiones HTTP del catálogo de Unity para almacenar credenciales de forma segura:
CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
host 'https://api.example.com',
port '443',
base_path '/v1/',
bearer_token 'your-token-here'
);
A continuación, utilice la conexión en su UDF de SQL con la función http_request(). Para obtener más información, consulte Conexión a servicios HTTP externos.
WorkspaceClient
Para los operadores de python-run-function, pueden usar WorkspaceClient de Azure Databricks para acceder a los recursos del área de trabajo y a las API externas:
def run(config, inputs, spark):
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Use w to access workspace resources
Creación de un operador completo definido por el usuario de python-run-function
Los pasos siguientes le guían por la creación de un python-run-function operador desde cero.
Paso 1: Definir la lógica
Escribe la función run() en un cuaderno:
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
Paso 2: Probar la función
Pruebe la función de forma interactiva con datos de ejemplo:
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)
result = run(
config={"column_name": "processed_at"},
inputs={"in": test_df},
spark=spark
)
result["out"].show()
Paso 3: Creación de la configuración de YAML
Defina los metadatos del operador, los campos de configuración y los puertos en un archivo YAML:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
Paso 4: Combinar la lógica y YAML
Agregue los run_function campos y ports para crear el archivo YAML completo. Guárdelo en el área de trabajo, por ejemplo /Workspace/Users/<user-name>/udos/add_timestamp.yaml:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
Paso 5: Registrar el operador
Añada la ruta del archivo a su archivo .user_defined_operators.yaml:
operators:
- /Workspace/Users/<user-name>/udos/add_timestamp.yaml
Paso 6: Uso del operador en Lakeflow Designer
Abra Lakeflow Designer y compruebe que el operador aparece en la paleta de operadores. Arrástrelo al lienzo, conecte una entrada, configure el nombre de la columna y ejecute una vista previa.
Creación de un operador completo definido por el usuario de UC
En los pasos siguientes se explica cómo crear un operador basado en uc-udf UC.
Paso 1: Definir la lógica
Escriba y pruebe la lógica de función en un cuaderno:
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
Paso 2: Creación de la configuración de YAML
Defina los metadatos del operador, los campos de configuración y los puertos:
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
Paso 3: Combinar la lógica y YAML
Cree la función de Unity Catalog con el YAML incrustado como una cadena de documentación:
CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: "1.0.0"
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
"""
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
return double_value(input_value)
$$
Paso 4: Probar la función
SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10
Paso 5: Registrar el operador
Agregue la referencia de la función catálogo de Unity al .user_defined_operators.yaml archivo:
operators:
- catalog: main
schema: my_schema
functionName: double_value
Paso 6: Uso del operador en Lakeflow Designer
Abra Lakeflow Designer y compruebe que el operador aparece en la paleta de operadores. Arrástrelo al lienzo, conecte una entrada y ejecute una vista previa.
Troubleshooting
| Issue | Solución |
|---|---|
| El operador no aparece en el Diseñador de Lakeflow. | Compruebe que .user_defined_operators.yaml existe e incluye su función o la ruta de acceso del archivo. Para los operadores python-run-function, verifique la ruta del archivo y que el archivo YAML sea accesible. |
| Se produce un error en la validación del esquema. | Compruebe el CÓDIGO YAML con el esquema oficial en https://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.json. |
| permiso denegado. | En el caso de los operadores basados en UC, compruebe que los usuarios tienen EXECUTE en la función y USE SCHEMA en el esquema. Para los operadores python-run-function, compruebe que los usuarios tienen acceso de lectura al archivo YAML. |
python-run-function el operador produce un error en tiempo de ejecución. |
Compruebe que la firma de la run() función coincide con def run(config, inputs, spark). Compruebe que los nombres de puerto del código coincidan con YAML y que las claves de diccionario de retorno coincidan con los valores del puerto name de salida. |
| UDTF devuelve tipos incorrectos. | Los tipos de retorno de las UDTF deben declararse explícitamente: no se puede hacer referencia a los tipos de las columnas de entrada. |
Permissions
| Permiso | propósito |
|---|---|
Acceso de lectura a .user_defined_operators.yaml. |
Descubra el operador . |
Acceso de lectura al archivo YAML (python-run-function solo). |
Cargue la definición del operador. |
| EXECUTE en la función catálogo de Unity (solo operadores basados en UC). | Ejecute el operador . |
| USE SCHEMA en el esquema (solo operadores basados en UC). | Acceda al esquema donde se crea la función. |
| Otros permisos | Según el operador, los usuarios pueden requerir otros permisos. Por ejemplo, USE CONNECTION en una conexión de Unity Catalog para llamadas a la API HTTP. |
Pasos siguientes
Explore los siguientes tutoriales:
| Example | Tipo | Description |
|---|---|---|
| Remitente de correo electrónico de Gmail | python-run-function |
Enviar los datos de un DataFrame como archivo adjunto CSV en un correo electrónico a través de Gmail. |
| Calculadora de interés compuesto | uc-udf |
Calcule los valores de inversión futuros mediante la fórmula de interés compuesto. |
| Agrupación en clústeres K-Means | uc-udtf |
Segmente los datos en clústeres mediante scikit-learn. |
| Enviar mensaje de Slack | uc-udf |
Enviar notificaciones a canales de Slack a través de la API. |
| Todos los widgets de interfaz de usuario | uc-udf |
Operador de referencia que muestra todos los widgets de interfaz de usuario disponibles. |
Para obtener una referencia completa al esquema YAML, consulte Referencia de YAML del operador definido por el usuario.