Opérateurs définis par l’utilisateur dans Lakeflow Designer

Important

Cette fonctionnalité est disponible en préversion publique.

Lakeflow Designer vous permet de créer des opérateurs définis par l’utilisateur qui apparaissent directement dans le canevas avec des opérateurs intégrés. Utilisez-les pour étendre Lakeflow Designer avec votre propre logique métier, vos calculs ou vos intégrations.

Il existe trois types d’opérateurs définis par l’utilisateur :

  • python-run-function : fichier YAML autonome avec Python inline stocké dans l’espace de travail. Idéal pour les transformations de niveau DataFrame et les intégrations externes. Les autorisations sont gérées au niveau du fichier de l’espace de travail.
  • uc-udf: encapsule une fonction scalaire du catalogue Unity. Idéal pour les transformations au niveau des colonnes. L’accès est régi par les autorisations du catalogue Unity.
  • uc-udtf : encapsule une fonction Unity Catalog renvoyant une table. Idéal pour les transformations à l’échelle de la table, telles que le regroupement par apprentissage automatique et l’agrégation. L’accès est régi par les autorisations du catalogue Unity.
Fonctionnalité python-run-function uc-udf uc-udtf
Exemple de cas d’usage Transformations de DataFrame, intégrations d’API, notifications par e-mail Calculs au niveau des colonnes (IMC, taux d’intérêt) Regroupement par apprentissage automatique, agrégation sur l’ensemble des lignes
Input Cadres de données Valeurs uniques Table entière, ligne par ligne
Sortie Cadres de données Valeur unique Table (plusieurs lignes)
Nécessite la fonction Catalogue Unity Non Oui Oui
Gouvernance des accès Autorisations des fichiers de l’espace de travail Autorisations du catalogue Unity (EXECUTE, USE SCHEMA) Autorisations du catalogue Unity (EXECUTE, USE SCHEMA)
Langues prises en charge Python uniquement SQL ou Python dans un wrapper SQL SQL ou Python dans un wrapper SQL

Comment fonctionnent les opérateurs définis par l’utilisateur ?

Un opérateur défini par l’utilisateur se compose des éléments suivants :

  • Logique d’opérateur : code qui s’exécute lorsque l’opérateur s’exécute. Il peut s’agir d’une fonction Python run() inline (pour python-run-function) ou d’une fonction de catalogue Unity (pour uc-udf et uc-udtf).
  • Configuration YAML : indique au Concepteur Lakeflow comment présenter l’opérateur dans l’interface utilisateur, y compris le nom, la description, les paramètres d’entrée, les widgets d’interface utilisateur et les ports de l’opérateur. Tous les types d’opérateurs utilisent le user-defined-operator-v0.1.0 schéma.
  • Fichier d’inscription : entrée .user_defined_operators.yaml qui permet au Concepteur Lakeflow de découvrir l’opérateur.

Logique d’opérateur

Fonction d’exécution Python : logique de l’opérateur défini par l’utilisateur

Chaque python-run-function opérateur doit définir une run() fonction :

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config: Valeurs configurées par l’utilisateur dans l’interface utilisateur, indexées par nom de propriété.
  • inputs: DataFrames d’entrée, indexées par port named’entrée.
  • spark: La SparkSession active.
  • Renvoie : un dictionnaire qui associe les valeurs du port de sortie name à des DataFrames.

L’exemple suivant filtre les lignes d’un DataFrame d’entrée :

def run(config, inputs, spark):
    df = inputs["in"]
    filtered = df.filter(config["filter_expression"])
    return {"out": filtered}

Si votre opérateur requiert des packages pip externes, ajoutez le environment champ au YAML :

environment:
  environment_version: '1'
  dependencies:
    - requests==2.31.0
    - beautifulsoup4==4.12.0

Logique d’opérateur UDF et UDTF

Vous pouvez écrire des fonctions UC dans SQL ou Python. Les fonctions Python sont intégrées dans une instruction SQL CREATE FUNCTION :

Fonction SQL :

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
  SELECT weight_kg / (height_m * height_m);

Python fonction (encapsulée dans SQL) :

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
  return weight_kg / (height_m ** 2)
$$;

Les UDF traitent une seule valeur à la fois et renvoient une valeur calculée. Les UDTF traitent les tables ligne par ligne et peuvent conserver l’état d’une ligne à l’autre. Utiliser uc-udf pour les transformations au niveau des colonnes et uc-udtf pour les opérations telles que le clustering ml ou l’agrégation.

De plus, les UDTF nécessitent la définition de trois méthodes clés : __init__(), eval() et terminate() :

class MyOperator:
    def __init__(self):
        # Called before processing - initialize any values needed.

    def eval(self, row, id_column, columns, k):
        # Called once per input row - accumulate data here.

    def terminate(self):
        # Called after all rows - perform final calculations and yield results.

Note

Les tables de retour UDTF doivent avoir des types fixes et explicites. Vous ne pouvez pas référencer les types de colonnes d’entrée dans la configuration de retour.

Configuration YAML

La configuration YAML indique au Concepteur Lakeflow comment présenter l’opérateur dans l’interface utilisateur. Il définit le nom, la description, les paramètres d’entrée, les widgets d’interface utilisateur et les ports de l’opérateur. Chaque champ de configuration est une propriété avec un type, un titre et des indicateurs de widget facultatifs x-ui :

config:
  type: object
  properties:
    my_param:
      type: string
      title: My Parameter
      x-ui:
        widget: input
    my_expression:
      type: string
      title: Column
      format: expression
      x-ui:
        widget: expression
        port: in
    my_number:
      type: number
      title: Count
      default: 10
      minimum: 0
      maximum: 100
  required:
    - my_param
    - my_expression

Pour plus d’informations sur le schéma YAML, y compris tous les types de widgets et toutes les options de configuration, consultez la référence YAML de l’opérateur défini par l’utilisateur.

Ports

Les ports définissent les entrées et sorties de votre opérateur :

ports:
  input:
    - name: in
      title: Input Data
      mime: application/vnd.databricks.dataframe
      required: true
      allowMultiple: false
  output:
    - name: out
      title: Output Data

YAML pour les opérateurs de fonction d’exécution Python

Pour les opérateurs python-run-function, le fichier YAML est autonome et inclut un champ run_function avec du code Python inline :

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
  type: object
  properties:
    filter_expression:
      type: string
      title: Filter Expression
      x-ui:
        widget: input
  required:
    - filter_expression
ports:
  input:
    - name: in
      title: Input
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    def run(config, inputs, spark):
        df = inputs["in"]
        filtered = df.filter(config["filter_expression"])
        return {"out": filtered}

Fonctions de catalogue YAML pour Unity

Pour les opérateurs basés sur l’UC, incorporez la configuration YAML en tant que commentaire ou docstring dans votre fonction.

Dans SQL (utiliser /* ... */ un commentaire) :

RETURN(/*
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Calculate BMI
  id: calculate_bmi
  version: "1.0.0"
  description: Calculates BMI from weight and height.
  config:
    type: object
    properties:
      weight_kg:
        type: string
        title: Weight (in kg)
        format: expression
        x-ui:
          widget: expression
          port: in
      height_m:
        type: string
        title: Height (in meters)
        format: expression
        x-ui:
          widget: expression
          port: in
    required:
      - weight_kg
      - height_m
  ports:
    input:
      - name: in
        title: Input Data
    output:
      - name: out
        title: Output
    */
  SELECT weight_kg / (height_m * height_m)
);

In Python (utilisez """ ... """ docstring) :

AS $$
  """
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Calculate BMI
  id: calculate_bmi
  version: "1.0.0"
  description: Calculates BMI from weight and height.
  config:
    type: object
    properties:
      weight_kg:
        type: string
        title: Weight (in kg)
        format: expression
        x-ui:
          widget: expression
          port: in
      height_m:
        type: string
        title: Height (in meters)
        format: expression
        x-ui:
          widget: expression
          port: in
    required:
      - weight_kg
      - height_m
  ports:
    input:
      - name: in
        title: Input Data
    output:
      - name: out
        title: Output
  """

  return weight_kg / (height_m ** 2)
$$;

Inscrire et déployer votre opérateur sur Lakeflow Designer

Pour que votre opérateur apparaisse dans Lakeflow Designer, inscrivez-le dans un .user_defined_operators.yaml fichier :

  • Niveau de l’espace de travail : Placez le fichier à la racine de votre espace de travail pour rendre l’opérateur visible par tous les utilisateurs.
  • Niveau utilisateur : Placez le fichier dans votre dossier d’accueil de l’utilisateur (/Workspace/Users/<user-name>/.user_defined_operators.yaml) pour rendre les opérateurs visibles uniquement pour vous.

La section operators: prend en charge les chemins d’accès aux fichiers, les références à des fonctions Unity Catalog et les motifs glob. Vous pouvez combiner des types d’entrée :

operators:
  # File path (python-run-function operators)
  - /Workspace/Users/me/udos/my_operator.yaml
  # Glob pattern (registers all matching files)
  - /Workspace/Users/me/udos/transforms/*.yaml
  # UC function reference (uc-udf and uc-udtf operators)
  - catalog: my_catalog
    schema: my_schema
    functionName: my_function

Configurations avancées

Mode aperçu

Lakeflow Designer prend en charge les aperçus en mode de conception. Pour les opérateurs qui appellent des API externes ou écrivent dans des systèmes externes, ajoutez une is_preview propriété de configuration pour que vous puissiez ignorer les effets secondaires pendant la préversion. Lorsque le mode d’aperçu est activé, les utilisateurs doivent cliquer explicitement sur Exécuter pour exécuter l’opérateur avec des effets secondaires.

config:
  type: object
  properties:
    is_preview:
      type: boolean
      format: is_preview
      default: false

Lakeflow Designer définit automatiquement cette valeur true pendant la préversion. Vérifiez-le dans votre logique pour ignorer les effets secondaires :

# In a python-run-function
if config.get("is_preview"):
    return {"out": inputs["in"]}

# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END

Connexions du catalogue Unity

Pour les opérateurs SQL basés sur l’UC qui appellent des API externes, utilisez les connexions HTTP du catalogue Unity pour stocker en toute sécurité les informations d’identification :

CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
  host 'https://api.example.com',
  port '443',
  base_path '/v1/',
  bearer_token 'your-token-here'
);

Utilisez ensuite la connexion dans votre UDF SQL avec la fonction http_request(). Pour plus d’informations, consultez Se connecter aux services HTTP externes.

WorkspaceClient

Pour les opérateurs python-run-function, vous pouvez utiliser le Azure Databricks WorkspaceClient pour accéder aux ressources de l’espace de travail et aux API externes :

def run(config, inputs, spark):
    from databricks.sdk import WorkspaceClient
    w = WorkspaceClient()
    # Use w to access workspace resources

Créer un opérateur python-run-function complet défini par l’utilisateur

Les étapes suivantes expliquent comment créer un python-run-function opérateur à partir de zéro.

Étape 1 : Définir la logique

Écrivez votre fonction run() dans un carnet :

from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
    from pyspark.sql import functions as F
    df = inputs["in"]
    result = df.withColumn(config["column_name"], F.current_timestamp())
    return {"out": result}

Étape 2 : Tester la fonction

Testez la fonction de manière interactive avec des exemples de données :

test_df = spark.createDataFrame(
    [("Alice", 100), ("Bob", 200)],
    ["name", "amount"]
)

result = run(
    config={"column_name": "processed_at"},
    inputs={"in": test_df},
    spark=spark
)

result["out"].show()

Étape 3 : Créer la configuration YAML

Définissez les métadonnées d’opérateur, les champs de configuration et les ports dans un fichier YAML :

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
  type: object
  properties:
    column_name:
      type: string
      title: Column Name
      default: processed_at
      x-ui:
        widget: input
  required:
    - column_name

Étape 4 : Combiner la logique et YAML

Ajoutez les champs run_function et ports pour créer le fichier YAML complet. Enregistrez-le dans votre espace de travail, par exemple /Workspace/Users/<user-name>/udos/add_timestamp.yaml:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
  type: object
  properties:
    column_name:
      type: string
      title: Column Name
      default: processed_at
      x-ui:
        widget: input
  required:
    - column_name
ports:
  input:
    - name: in
      title: Input
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    from typing import Dict, Any

    def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
        from pyspark.sql import functions as F
        df = inputs["in"]
        result = df.withColumn(config["column_name"], F.current_timestamp())
        return {"out": result}

Étape 5 : Inscrire l’opérateur

Ajoutez le chemin du fichier à votre .user_defined_operators.yaml fichier :

operators:
  - /Workspace/Users/<user-name>/udos/add_timestamp.yaml

Étape 6 : Utiliser l’opérateur dans Lakeflow Designer

Ouvrez Lakeflow Designer et vérifiez que l’opérateur apparaît dans la palette d’opérateurs. Faites-le glisser sur le canevas, connectez une entrée, configurez le nom de la colonne et exécutez un aperçu.

Créer un opérateur UC complet défini par l’utilisateur

Les étapes suivantes décrivent la création d’un opérateur basé sur l’UC uc-udf.

Étape 1 : Définir la logique

Écrivez et testez votre logique de fonction dans un notebook :

def double_value(input_value: float) -> float:
    if input_value is None:
        return None
    return input_value * 2

Étape 2 : Créer la configuration YAML

Définissez les métadonnées d’opérateur, les champs de configuration et les ports :

schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
  type: object
  properties:
    input_value:
      type: string
      title: Input Value
      format: expression
      x-ui:
        widget: expression
        port: input_data
  required:
    - input_value
ports:
  input:
    - name: input_data
      title: Input
  output:
    - name: out
      title: Output

Étape 3 : Combiner la logique et YAML

Créez la fonction catalogue Unity avec le yaML incorporé en tant que docstring :

CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
  """
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Double Value
  id: math.double_value
  version: "1.0.0"
  description: Doubles the input value
  config:
    type: object
    properties:
      input_value:
        type: string
        title: Input Value
        format: expression
        x-ui:
          widget: expression
          port: input_data
    required:
      - input_value
  ports:
    input:
      - name: input_data
        title: Input
    output:
      - name: out
        title: Output
  """

  def double_value(input_value: float) -> float:
      if input_value is None:
          return None
      return input_value * 2

  return double_value(input_value)
$$

Étape 4 : Tester la fonction

SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10

Étape 5 : Inscrire l’opérateur

Ajoutez la référence de la fonction catalogue Unity à votre .user_defined_operators.yaml fichier :

operators:
  - catalog: main
    schema: my_schema
    functionName: double_value

Étape 6 : Utiliser l’opérateur dans Lakeflow Designer

Ouvrez Lakeflow Designer et vérifiez que l’opérateur apparaît dans la palette d’opérateurs. Faites-le glisser sur le canevas, connectez une entrée et exécutez un aperçu.

Troubleshooting

Issue Solution
L’opérateur n’apparaît pas dans Lakeflow Designer. Vérifiez que .user_defined_operators.yaml existe et répertorie votre fonction ou votre chemin de fichier. Pour les opérateurs python-run-function, vérifiez le chemin du fichier et que le fichier YAML est accessible.
Échec de la validation du schéma. Vérifiez votre YAML sur le schéma officiel à l’adresse https://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.json.
Permission refusée. Pour les opérateurs basés sur l’UC, vérifiez que les utilisateurs ont EXECUTE sur la fonction et USE SCHEMA sur le schéma. Pour les opérateurs python-run-function, vérifiez que les utilisateurs disposent d’un accès en lecture au fichier YAML.
python-run-function l’opérateur échoue au moment de l’exécution. Vérifiez que la signature de fonction run() correspond à def run(config, inputs, spark). Vérifiez que les noms de port dans le code correspondent à YAML et que les clés de dictionnaire renvoyées correspondent aux valeurs de port name de sortie.
L’UDTF retourne des types incorrects. Les types de retour UDTF doivent être explicites . Vous ne pouvez pas référencer les types de colonnes d’entrée.

Permissions

Autorisation Purpose
Accès en lecture à .user_defined_operators.yaml. Découvrez l’opérateur.
Accès en lecture au fichier YAML (python-run-function uniquement). Chargez la définition de l’opérateur.
EXECUTE sur la fonction catalogue Unity (opérateurs basés sur UC uniquement). Lancez l’opérateur.
USE SCHEMA sur le schéma (opérateurs basés sur l’UC uniquement). Accédez au schéma où la fonction est créée.
Autres autorisations Selon votre opérateur, les utilisateurs peuvent nécessiter d’autres autorisations. Par exemple, USE CONNECTION sur une connexion Unity Catalog pour les appels à l’API HTTP.

Étapes suivantes

Explorez les didacticiels suivants :

Example Catégorie Description
Expéditeur d’e-mail Gmail python-run-function Envoyez des données DataFrame sous forme de pièce jointe de messagerie CSV via Gmail.
Calculatrice d’intérêt composé uc-udf Calculez les valeurs futures d’investissement à l’aide de la formule d’intérêt composée.
Clustering k-moyennes uc-udtf Segmentez les données en clusters à l’aide de scikit-learn.
Envoyer un message Slack uc-udf Envoyez des notifications aux canaux Slack via l’API.
Tous les widgets d’interface utilisateur uc-udf Opérateur de référence présentant tous les widgets d’interface utilisateur disponibles.

Pour obtenir une référence complète au schéma YAML, consultez la référence YAML de l’opérateur défini par l’utilisateur.