Remarque
L’accès à cette page requiert une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page requiert une autorisation. Vous pouvez essayer de modifier des répertoires.
Important
Cette fonctionnalité est disponible en préversion publique.
Lakeflow Designer vous permet de créer des opérateurs définis par l’utilisateur qui apparaissent directement dans le canevas avec des opérateurs intégrés. Utilisez-les pour étendre Lakeflow Designer avec votre propre logique métier, vos calculs ou vos intégrations.
Il existe trois types d’opérateurs définis par l’utilisateur :
-
python-run-function: fichier YAML autonome avec Python inline stocké dans l’espace de travail. Idéal pour les transformations de niveau DataFrame et les intégrations externes. Les autorisations sont gérées au niveau du fichier de l’espace de travail. -
uc-udf: encapsule une fonction scalaire du catalogue Unity. Idéal pour les transformations au niveau des colonnes. L’accès est régi par les autorisations du catalogue Unity. -
uc-udtf: encapsule une fonction Unity Catalog renvoyant une table. Idéal pour les transformations à l’échelle de la table, telles que le regroupement par apprentissage automatique et l’agrégation. L’accès est régi par les autorisations du catalogue Unity.
| Fonctionnalité | python-run-function |
uc-udf |
uc-udtf |
|---|---|---|---|
| Exemple de cas d’usage | Transformations de DataFrame, intégrations d’API, notifications par e-mail | Calculs au niveau des colonnes (IMC, taux d’intérêt) | Regroupement par apprentissage automatique, agrégation sur l’ensemble des lignes |
| Input | Cadres de données | Valeurs uniques | Table entière, ligne par ligne |
| Sortie | Cadres de données | Valeur unique | Table (plusieurs lignes) |
| Nécessite la fonction Catalogue Unity | Non | Oui | Oui |
| Gouvernance des accès | Autorisations des fichiers de l’espace de travail | Autorisations du catalogue Unity (EXECUTE, USE SCHEMA) |
Autorisations du catalogue Unity (EXECUTE, USE SCHEMA) |
| Langues prises en charge | Python uniquement | SQL ou Python dans un wrapper SQL | SQL ou Python dans un wrapper SQL |
Comment fonctionnent les opérateurs définis par l’utilisateur ?
Un opérateur défini par l’utilisateur se compose des éléments suivants :
-
Logique d’opérateur : code qui s’exécute lorsque l’opérateur s’exécute. Il peut s’agir d’une fonction Python
run()inline (pourpython-run-function) ou d’une fonction de catalogue Unity (pouruc-udfetuc-udtf). -
Configuration YAML : indique au Concepteur Lakeflow comment présenter l’opérateur dans l’interface utilisateur, y compris le nom, la description, les paramètres d’entrée, les widgets d’interface utilisateur et les ports de l’opérateur. Tous les types d’opérateurs utilisent le
user-defined-operator-v0.1.0schéma. -
Fichier d’inscription : entrée
.user_defined_operators.yamlqui permet au Concepteur Lakeflow de découvrir l’opérateur.
Logique d’opérateur
Fonction d’exécution Python : logique de l’opérateur défini par l’utilisateur
Chaque python-run-function opérateur doit définir une run() fonction :
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
-
config: Valeurs configurées par l’utilisateur dans l’interface utilisateur, indexées par nom de propriété. -
inputs: DataFrames d’entrée, indexées par portnamed’entrée. -
spark: La SparkSession active. -
Renvoie : un dictionnaire qui associe les valeurs du port de sortie
nameà des DataFrames.
L’exemple suivant filtre les lignes d’un DataFrame d’entrée :
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
Si votre opérateur requiert des packages pip externes, ajoutez le environment champ au YAML :
environment:
environment_version: '1'
dependencies:
- requests==2.31.0
- beautifulsoup4==4.12.0
Logique d’opérateur UDF et UDTF
Vous pouvez écrire des fonctions UC dans SQL ou Python. Les fonctions Python sont intégrées dans une instruction SQL CREATE FUNCTION :
Fonction SQL :
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
SELECT weight_kg / (height_m * height_m);
Python fonction (encapsulée dans SQL) :
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
return weight_kg / (height_m ** 2)
$$;
Les UDF traitent une seule valeur à la fois et renvoient une valeur calculée. Les UDTF traitent les tables ligne par ligne et peuvent conserver l’état d’une ligne à l’autre. Utiliser uc-udf pour les transformations au niveau des colonnes et uc-udtf pour les opérations telles que le clustering ml ou l’agrégation.
De plus, les UDTF nécessitent la définition de trois méthodes clés : __init__(), eval() et terminate() :
class MyOperator:
def __init__(self):
# Called before processing - initialize any values needed.
def eval(self, row, id_column, columns, k):
# Called once per input row - accumulate data here.
def terminate(self):
# Called after all rows - perform final calculations and yield results.
Note
Les tables de retour UDTF doivent avoir des types fixes et explicites. Vous ne pouvez pas référencer les types de colonnes d’entrée dans la configuration de retour.
Configuration YAML
La configuration YAML indique au Concepteur Lakeflow comment présenter l’opérateur dans l’interface utilisateur. Il définit le nom, la description, les paramètres d’entrée, les widgets d’interface utilisateur et les ports de l’opérateur. Chaque champ de configuration est une propriété avec un type, un titre et des indicateurs de widget facultatifs x-ui :
config:
type: object
properties:
my_param:
type: string
title: My Parameter
x-ui:
widget: input
my_expression:
type: string
title: Column
format: expression
x-ui:
widget: expression
port: in
my_number:
type: number
title: Count
default: 10
minimum: 0
maximum: 100
required:
- my_param
- my_expression
Pour plus d’informations sur le schéma YAML, y compris tous les types de widgets et toutes les options de configuration, consultez la référence YAML de l’opérateur défini par l’utilisateur.
Ports
Les ports définissent les entrées et sorties de votre opérateur :
ports:
input:
- name: in
title: Input Data
mime: application/vnd.databricks.dataframe
required: true
allowMultiple: false
output:
- name: out
title: Output Data
YAML pour les opérateurs de fonction d’exécution Python
Pour les opérateurs python-run-function, le fichier YAML est autonome et inclut un champ run_function avec du code Python inline :
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
type: object
properties:
filter_expression:
type: string
title: Filter Expression
x-ui:
widget: input
required:
- filter_expression
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
Fonctions de catalogue YAML pour Unity
Pour les opérateurs basés sur l’UC, incorporez la configuration YAML en tant que commentaire ou docstring dans votre fonction.
Dans SQL (utiliser /* ... */ un commentaire) :
RETURN(/*
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
*/
SELECT weight_kg / (height_m * height_m)
);
In Python (utilisez """ ... """ docstring) :
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
"""
return weight_kg / (height_m ** 2)
$$;
Inscrire et déployer votre opérateur sur Lakeflow Designer
Pour que votre opérateur apparaisse dans Lakeflow Designer, inscrivez-le dans un .user_defined_operators.yaml fichier :
- Niveau de l’espace de travail : Placez le fichier à la racine de votre espace de travail pour rendre l’opérateur visible par tous les utilisateurs.
-
Niveau utilisateur : Placez le fichier dans votre dossier d’accueil de l’utilisateur (
/Workspace/Users/<user-name>/.user_defined_operators.yaml) pour rendre les opérateurs visibles uniquement pour vous.
La section operators: prend en charge les chemins d’accès aux fichiers, les références à des fonctions Unity Catalog et les motifs glob. Vous pouvez combiner des types d’entrée :
operators:
# File path (python-run-function operators)
- /Workspace/Users/me/udos/my_operator.yaml
# Glob pattern (registers all matching files)
- /Workspace/Users/me/udos/transforms/*.yaml
# UC function reference (uc-udf and uc-udtf operators)
- catalog: my_catalog
schema: my_schema
functionName: my_function
Configurations avancées
Mode aperçu
Lakeflow Designer prend en charge les aperçus en mode de conception. Pour les opérateurs qui appellent des API externes ou écrivent dans des systèmes externes, ajoutez une is_preview propriété de configuration pour que vous puissiez ignorer les effets secondaires pendant la préversion. Lorsque le mode d’aperçu est activé, les utilisateurs doivent cliquer explicitement sur Exécuter pour exécuter l’opérateur avec des effets secondaires.
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
Lakeflow Designer définit automatiquement cette valeur true pendant la préversion. Vérifiez-le dans votre logique pour ignorer les effets secondaires :
# In a python-run-function
if config.get("is_preview"):
return {"out": inputs["in"]}
# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END
Connexions du catalogue Unity
Pour les opérateurs SQL basés sur l’UC qui appellent des API externes, utilisez les connexions HTTP du catalogue Unity pour stocker en toute sécurité les informations d’identification :
CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
host 'https://api.example.com',
port '443',
base_path '/v1/',
bearer_token 'your-token-here'
);
Utilisez ensuite la connexion dans votre UDF SQL avec la fonction http_request(). Pour plus d’informations, consultez Se connecter aux services HTTP externes.
WorkspaceClient
Pour les opérateurs python-run-function, vous pouvez utiliser le Azure Databricks WorkspaceClient pour accéder aux ressources de l’espace de travail et aux API externes :
def run(config, inputs, spark):
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Use w to access workspace resources
Créer un opérateur python-run-function complet défini par l’utilisateur
Les étapes suivantes expliquent comment créer un python-run-function opérateur à partir de zéro.
Étape 1 : Définir la logique
Écrivez votre fonction run() dans un carnet :
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
Étape 2 : Tester la fonction
Testez la fonction de manière interactive avec des exemples de données :
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)
result = run(
config={"column_name": "processed_at"},
inputs={"in": test_df},
spark=spark
)
result["out"].show()
Étape 3 : Créer la configuration YAML
Définissez les métadonnées d’opérateur, les champs de configuration et les ports dans un fichier YAML :
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
Étape 4 : Combiner la logique et YAML
Ajoutez les champs run_function et ports pour créer le fichier YAML complet. Enregistrez-le dans votre espace de travail, par exemple /Workspace/Users/<user-name>/udos/add_timestamp.yaml:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
Étape 5 : Inscrire l’opérateur
Ajoutez le chemin du fichier à votre .user_defined_operators.yaml fichier :
operators:
- /Workspace/Users/<user-name>/udos/add_timestamp.yaml
Étape 6 : Utiliser l’opérateur dans Lakeflow Designer
Ouvrez Lakeflow Designer et vérifiez que l’opérateur apparaît dans la palette d’opérateurs. Faites-le glisser sur le canevas, connectez une entrée, configurez le nom de la colonne et exécutez un aperçu.
Créer un opérateur UC complet défini par l’utilisateur
Les étapes suivantes décrivent la création d’un opérateur basé sur l’UC uc-udf.
Étape 1 : Définir la logique
Écrivez et testez votre logique de fonction dans un notebook :
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
Étape 2 : Créer la configuration YAML
Définissez les métadonnées d’opérateur, les champs de configuration et les ports :
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
Étape 3 : Combiner la logique et YAML
Créez la fonction catalogue Unity avec le yaML incorporé en tant que docstring :
CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: "1.0.0"
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
"""
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
return double_value(input_value)
$$
Étape 4 : Tester la fonction
SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10
Étape 5 : Inscrire l’opérateur
Ajoutez la référence de la fonction catalogue Unity à votre .user_defined_operators.yaml fichier :
operators:
- catalog: main
schema: my_schema
functionName: double_value
Étape 6 : Utiliser l’opérateur dans Lakeflow Designer
Ouvrez Lakeflow Designer et vérifiez que l’opérateur apparaît dans la palette d’opérateurs. Faites-le glisser sur le canevas, connectez une entrée et exécutez un aperçu.
Troubleshooting
| Issue | Solution |
|---|---|
| L’opérateur n’apparaît pas dans Lakeflow Designer. | Vérifiez que .user_defined_operators.yaml existe et répertorie votre fonction ou votre chemin de fichier. Pour les opérateurs python-run-function, vérifiez le chemin du fichier et que le fichier YAML est accessible. |
| Échec de la validation du schéma. | Vérifiez votre YAML sur le schéma officiel à l’adresse https://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.json. |
| Permission refusée. | Pour les opérateurs basés sur l’UC, vérifiez que les utilisateurs ont EXECUTE sur la fonction et USE SCHEMA sur le schéma. Pour les opérateurs python-run-function, vérifiez que les utilisateurs disposent d’un accès en lecture au fichier YAML. |
python-run-function l’opérateur échoue au moment de l’exécution. |
Vérifiez que la signature de fonction run() correspond à def run(config, inputs, spark). Vérifiez que les noms de port dans le code correspondent à YAML et que les clés de dictionnaire renvoyées correspondent aux valeurs de port name de sortie. |
| L’UDTF retourne des types incorrects. | Les types de retour UDTF doivent être explicites . Vous ne pouvez pas référencer les types de colonnes d’entrée. |
Permissions
| Autorisation | Purpose |
|---|---|
Accès en lecture à .user_defined_operators.yaml. |
Découvrez l’opérateur. |
Accès en lecture au fichier YAML (python-run-function uniquement). |
Chargez la définition de l’opérateur. |
| EXECUTE sur la fonction catalogue Unity (opérateurs basés sur UC uniquement). | Lancez l’opérateur. |
| USE SCHEMA sur le schéma (opérateurs basés sur l’UC uniquement). | Accédez au schéma où la fonction est créée. |
| Autres autorisations | Selon votre opérateur, les utilisateurs peuvent nécessiter d’autres autorisations. Par exemple, USE CONNECTION sur une connexion Unity Catalog pour les appels à l’API HTTP. |
Étapes suivantes
Explorez les didacticiels suivants :
| Example | Catégorie | Description |
|---|---|---|
| Expéditeur d’e-mail Gmail | python-run-function |
Envoyez des données DataFrame sous forme de pièce jointe de messagerie CSV via Gmail. |
| Calculatrice d’intérêt composé | uc-udf |
Calculez les valeurs futures d’investissement à l’aide de la formule d’intérêt composée. |
| Clustering k-moyennes | uc-udtf |
Segmentez les données en clusters à l’aide de scikit-learn. |
| Envoyer un message Slack | uc-udf |
Envoyez des notifications aux canaux Slack via l’API. |
| Tous les widgets d’interface utilisateur | uc-udf |
Opérateur de référence présentant tous les widgets d’interface utilisateur disponibles. |
Pour obtenir une référence complète au schéma YAML, consultez la référence YAML de l’opérateur défini par l’utilisateur.