Benutzerdefinierte Operatoren im Lakeflow Designer

Important

Dieses Feature befindet sich in der Public Preview.

Mit Lakeflow Designer können Sie benutzerdefinierte Operatoren erstellen, die direkt im Zeichenbereich neben integrierten Operatoren angezeigt werden. Verwenden Sie sie, um Lakeflow Designer mit Ihrer eigenen Geschäftslogik, Berechnungen oder Integrationen zu erweitern.

Es gibt drei Arten von benutzerdefinierten Operatoren:

  • python-run-function: Eine eigenständige YAML-Datei mit Inline-Python, die im Arbeitsbereich gespeichert ist. Am besten geeignet für Transformationen auf Datenframeebene und externe Integrationen. Berechtigungen werden auf Arbeitsbereichsdateiebene verwaltet.
  • uc-udf: Umschließt eine Skalarfunktion des Unity-Katalogs. Am besten geeignet für Transformationen auf Spaltenebene. Der Zugriff unterliegt den Unity-Katalogberechtigungen.
  • uc-udtf: Umschließt eine Tabellenwertfunktion des Unity-Katalogs. Am besten geeignet für Transformationen auf Tabellenebene wie ML-Clustering und Aggregation. Der Zugriff unterliegt den Unity-Katalogberechtigungen.
Funktion python-run-function uc-udf uc-udtf
Exemplarischer Anwendungsfall DataFrame-Transformationen, API-Integrationen, E-Mail-Benachrichtigungen Berechnung auf Spaltenebene (BMI, Zinssätze) ML-Clustering, zeilenübergreifende Aggregation
Eingabe Datenrahmen Einzelne Werte Gesamte Tabelle, Zeile nach Zeile
Output Datenrahmen Einzelner Wert Tabelle (mehrere Zeilen)
Erfordert Unity-Katalogfunktion No Yes Yes
Zugriffsverwaltung Arbeitsbereichsdateiberechtigungen Unity-Katalogberechtigungen (EXECUTE, USE SCHEMA) Unity-Katalogberechtigungen (EXECUTE, USE SCHEMA)
Unterstützte Sprachen Nur Python SQL oder Python in einem SQL-Wrapper SQL oder Python in einem SQL-Wrapper

Wie funktionieren benutzerdefinierte Operatoren?

Ein benutzerdefinierter Operator besteht aus:

  • Operatorlogik: Der Code, der ausgeführt wird, wenn der Operator ausgeführt wird. Dies kann eine Inlinefunktion Python run() (für python-run-function) oder eine Unity-Katalogfunktion (für uc-udf und uc-udtf) sein.
  • YAML-Konfiguration: Teilt Lakeflow Designer mit, wie der Operator auf der Benutzeroberfläche dargestellt wird, einschließlich Name, Beschreibung, Eingabeparameter, UI-Widgets und Ports. Alle Operatortypen verwenden das user-defined-operator-v0.1.0 Schema.
  • Registrierungsdatei: Ein Eintrag in .user_defined_operators.yaml, der es Lakeflow Designer ermöglicht, den Operator zu erkennen.

Logik des Operators

Python-Run-Funktion für benutzerdefinierte Operatorlogik

Jeder python-run-function Operator muss eine run() Funktion definieren:

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
  • config: Vom Benutzer konfigurierte Werte aus der Benutzeroberfläche, schlüsselt nach Eigenschaftsname.
  • inputs: Input-DataFrames, nach Eingabeport name.
  • spark: Die aktive SparkSession.
  • Gibt zurück: Ein Wörterbuch, das Ausgabeportwerte name DataFrames zuordnet.

Im folgenden Beispiel werden Zeilen aus einem Eingabedatenframe gefiltert:

def run(config, inputs, spark):
    df = inputs["in"]
    filtered = df.filter(config["filter_expression"])
    return {"out": filtered}

Wenn Ihr Operator externe Pip-Pakete benötigt, fügen Sie das environment Feld zum YAML hinzu:

environment:
  environment_version: '1'
  dependencies:
    - requests==2.31.0
    - beautifulsoup4==4.12.0

UDF- und UDTF-Operatorlogik

Sie können UC-Funktionen in SQL oder Python schreiben. Python Funktionen werden in eine SQL-CREATE FUNCTION-Anweisung eingeschlossen:

SQL-Funktion:

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
  SELECT weight_kg / (height_m * height_m);

Python-Funktion (in SQL eingeschlossen):

CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
  return weight_kg / (height_m ** 2)
$$;

UDFs verarbeiten jeweils einen einzelnen Wert und geben einen berechneten Wert zurück. UDTFs verarbeiten Tabellenzeile nach Zeile und können den Zustand über alle Zeilen hinweg verwalten. Verwenden Sie uc-udf für Transformationen auf Spaltenebene und uc-udtf für Operationen wie ML-Clustering oder Aggregation.

Darüber hinaus müssen Sie mit UDTFs drei Schlüsselmethoden definieren: __init__(), , eval()und terminate():

class MyOperator:
    def __init__(self):
        # Called before processing - initialize any values needed.

    def eval(self, row, id_column, columns, k):
        # Called once per input row - accumulate data here.

    def terminate(self):
        # Called after all rows - perform final calculations and yield results.

Note

UDTF-Rückgabetabellen müssen feste, explizite Typen aufweisen. Sie können in der Rückgabekonfiguration nicht auf Eingabespaltentypen verweisen.

YAML-Konfiguration

Die YAML-Konfiguration teilt Lakeflow Designer mit, wie der Operator auf der Benutzeroberfläche dargestellt wird. Er definiert den Namen, die Beschreibung, eingabeparameter, UI-Widgets und Ports des Operators. Jedes Konfigurationsfeld ist eine Eigenschaft mit einem Typ, Titel und optionalen x-ui Widgethinweisen:

config:
  type: object
  properties:
    my_param:
      type: string
      title: My Parameter
      x-ui:
        widget: input
    my_expression:
      type: string
      title: Column
      format: expression
      x-ui:
        widget: expression
        port: in
    my_number:
      type: number
      title: Count
      default: 10
      minimum: 0
      maximum: 100
  required:
    - my_param
    - my_expression

Ausführliche Informationen zum YAML-Schema, einschließlich aller Widgettypen und Konfigurationsoptionen, finden Sie in der YAML-Referenz zum benutzerdefinierten Operator.

Häfen

Ports definieren die Eingaben und Ausgaben für Ihren Operator:

ports:
  input:
    - name: in
      title: Input Data
      mime: application/vnd.databricks.dataframe
      required: true
      allowMultiple: false
  output:
    - name: out
      title: Output Data

YAML für Python-Operatoren für Ausführungsfunktionen

Für python-run-function-Operatoren ist die YAML-Datei eigenständig und enthält ein run_function-Feld mit Inline-Python Code:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
  type: object
  properties:
    filter_expression:
      type: string
      title: Filter Expression
      x-ui:
        widget: input
  required:
    - filter_expression
ports:
  input:
    - name: in
      title: Input
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    def run(config, inputs, spark):
        df = inputs["in"]
        filtered = df.filter(config["filter_expression"])
        return {"out": filtered}

YAML für Unity-Katalogfunktionen

Betten Sie für UC-basierte Operatoren die YAML-Konfiguration als Kommentar oder Docstring in Ihre Funktion ein.

In SQL (Kommentar verwenden /* ... */ ):

RETURN(/*
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Calculate BMI
  id: calculate_bmi
  version: "1.0.0"
  description: Calculates BMI from weight and height.
  config:
    type: object
    properties:
      weight_kg:
        type: string
        title: Weight (in kg)
        format: expression
        x-ui:
          widget: expression
          port: in
      height_m:
        type: string
        title: Height (in meters)
        format: expression
        x-ui:
          widget: expression
          port: in
    required:
      - weight_kg
      - height_m
  ports:
    input:
      - name: in
        title: Input Data
    output:
      - name: out
        title: Output
    */
  SELECT weight_kg / (height_m * height_m)
);

In Python (verwenden Sie """ ... """ docstring):

AS $$
  """
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Calculate BMI
  id: calculate_bmi
  version: "1.0.0"
  description: Calculates BMI from weight and height.
  config:
    type: object
    properties:
      weight_kg:
        type: string
        title: Weight (in kg)
        format: expression
        x-ui:
          widget: expression
          port: in
      height_m:
        type: string
        title: Height (in meters)
        format: expression
        x-ui:
          widget: expression
          port: in
    required:
      - weight_kg
      - height_m
  ports:
    input:
      - name: in
        title: Input Data
    output:
      - name: out
        title: Output
  """

  return weight_kg / (height_m ** 2)
$$;

Registrieren und Bereitstellen Ihres Operators im Lakeflow Designer

Damit Ihr Operator im Lakeflow Designer angezeigt wird, registrieren Sie ihn in einer .user_defined_operators.yaml Datei:

  • Arbeitsbereichsebene: Platzieren Sie die Datei im Stammverzeichnis Ihres Arbeitsbereichs, um den Operator für alle Benutzer sichtbar zu machen.
  • Benutzerebene: Platzieren Sie die Datei in Ihrem Benutzerstartordner (/Workspace/Users/<user-name>/.user_defined_operators.yaml), damit Operatoren nur für Sie sichtbar sind.

Der operators: Abschnitt unterstützt Dateipfade, Unity Catalog-Funktionsverweise und Globmuster. Sie können Eintragstypen kombinieren:

operators:
  # File path (python-run-function operators)
  - /Workspace/Users/me/udos/my_operator.yaml
  # Glob pattern (registers all matching files)
  - /Workspace/Users/me/udos/transforms/*.yaml
  # UC function reference (uc-udf and uc-udtf operators)
  - catalog: my_catalog
    schema: my_schema
    functionName: my_function

Erweiterte Konfigurationen

Vorschaumodus

Lakeflow Designer unterstützt Vorschauen im Entwurfsmodus. Fügen Sie für Operatoren, die externe APIs aufrufen oder in externe Systeme schreiben, eine is_preview Konfigurationseigenschaft hinzu, damit Sie während der Vorschau Nebenwirkungen überspringen können. Wenn der Vorschaumodus aktiviert ist, müssen Benutzer explizit auf "Ausführen" klicken, um den Operator mit Nebenwirkungen auszuführen.

config:
  type: object
  properties:
    is_preview:
      type: boolean
      format: is_preview
      default: false

Lakeflow Designer setzt diesen Wert während der Vorschau automatisch auf true. Berücksichtigen Sie dies in Ihrer Logik, um Seiteneffekte zu vermeiden:

# In a python-run-function
if config.get("is_preview"):
    return {"out": inputs["in"]}

# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END

Unity-Katalogverbindungen

Verwenden Sie für UC-basierte SQL-Operatoren, die externe APIs aufrufen, Unity Catalog HTTP-Verbindungen, um Anmeldeinformationen sicher zu speichern:

CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
  host 'https://api.example.com',
  port '443',
  base_path '/v1/',
  bearer_token 'your-token-here'
);

Verwenden Sie dann die Verbindung in Ihrer SQL-UDF mit der http_request() Funktion. Ausführliche Informationen finden Sie unter Herstellen einer Verbindung mit externen HTTP-Diensten.

WorkspaceClient

Für python-run-function-Operatoren können Sie die Azure Databricks WorkspaceClient verwenden, um auf Arbeitsbereichsressourcen und externe APIs zuzugreifen:

def run(config, inputs, spark):
    from databricks.sdk import WorkspaceClient
    w = WorkspaceClient()
    # Use w to access workspace resources

Erstellen Sie einen vollständigen benutzerdefinierten Operator „python-run-function“

Die folgenden Schritte führen Sie durch das Erstellen eines python-run-function Operators von Grund auf.

Schritt 1: Definieren der Logik

Schreiben Sie Ihre run() Funktion in einem Notizbuch:

from typing import Dict, Any

def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
    from pyspark.sql import functions as F
    df = inputs["in"]
    result = df.withColumn(config["column_name"], F.current_timestamp())
    return {"out": result}

Schritt 2: Testen der Funktion

Testen Sie die Funktion interaktiv mit Beispieldaten:

test_df = spark.createDataFrame(
    [("Alice", 100), ("Bob", 200)],
    ["name", "amount"]
)

result = run(
    config={"column_name": "processed_at"},
    inputs={"in": test_df},
    spark=spark
)

result["out"].show()

Schritt 3: Erstellen der YAML-Konfiguration

Definieren Sie die Operatormetadaten, Konfigurationsfelder und Ports in einer YAML-Datei:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
  type: object
  properties:
    column_name:
      type: string
      title: Column Name
      default: processed_at
      x-ui:
        widget: input
  required:
    - column_name

Schritt 4: Kombinieren der Logik und YAML

Fügen Sie die Felder run_function und ports hinzu, um die vollständige YAML-Datei zu erstellen. Speichern Sie es in Ihrem Arbeitsbereich, z. B. /Workspace/Users/<user-name>/udos/add_timestamp.yaml:

schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
  type: object
  properties:
    column_name:
      type: string
      title: Column Name
      default: processed_at
      x-ui:
        widget: input
  required:
    - column_name
ports:
  input:
    - name: in
      title: Input
  output:
    - name: out
      title: Output
run_function:
  type: inline
  code: |
    from typing import Dict, Any

    def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
        from pyspark.sql import functions as F
        df = inputs["in"]
        result = df.withColumn(config["column_name"], F.current_timestamp())
        return {"out": result}

Schritt 5: Registrieren des Operators

Fügen Sie den Dateipfad zu Ihrer .user_defined_operators.yaml Datei hinzu:

operators:
  - /Workspace/Users/<user-name>/udos/add_timestamp.yaml

Schritt 6: Verwenden des Operators in Lakeflow Designer

Öffnen Sie Lakeflow Designer, und überprüfen Sie, ob der Operator in der Operatorpalette angezeigt wird. Ziehen Sie es auf die Arbeitsfläche, verbinden Sie einen Eingang, konfigurieren Sie den Spaltennamen und starten Sie eine Vorschau.

Erstellen Sie einen vollständigen benutzerdefinierten UC-Operator

Die folgenden Schritte führen Sie durch das Erstellen eines UC-basierten uc-udf Operators.

Schritt 1: Definieren der Logik

Schreiben und Testen der Funktionslogik in einem Notizbuch:

def double_value(input_value: float) -> float:
    if input_value is None:
        return None
    return input_value * 2

Schritt 2: Erstellen der YAML-Konfiguration

Definieren Sie die Operatormetadaten, Konfigurationsfelder und Ports:

schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
  type: object
  properties:
    input_value:
      type: string
      title: Input Value
      format: expression
      x-ui:
        widget: expression
        port: input_data
  required:
    - input_value
ports:
  input:
    - name: input_data
      title: Input
  output:
    - name: out
      title: Output

Schritt 3: Kombinieren der Logik und YAML

Erstellen Sie die Unity-Katalogfunktion mit dem yaML, das als Dokumentzeichenfolge eingebettet ist:

CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
  """
  schema: user-defined-operator-v0.1.0
  type: uc-udf
  name: Double Value
  id: math.double_value
  version: "1.0.0"
  description: Doubles the input value
  config:
    type: object
    properties:
      input_value:
        type: string
        title: Input Value
        format: expression
        x-ui:
          widget: expression
          port: input_data
    required:
      - input_value
  ports:
    input:
      - name: input_data
        title: Input
    output:
      - name: out
        title: Output
  """

  def double_value(input_value: float) -> float:
      if input_value is None:
          return None
      return input_value * 2

  return double_value(input_value)
$$

Schritt 4: Testen der Funktion

SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10

Schritt 5: Registrieren des Operators

Fügen Sie der Datei den Unity-Katalog-Funktionsverweis hinzu .user_defined_operators.yaml :

operators:
  - catalog: main
    schema: my_schema
    functionName: double_value

Schritt 6: Verwenden des Operators in Lakeflow Designer

Öffnen Sie Lakeflow Designer, und überprüfen Sie, ob der Operator in der Operatorpalette angezeigt wird. Ziehen Sie es auf die Arbeitsfläche, schließen Sie einen Eingang an und führen Sie die Vorschau aus.

Troubleshooting

Issue Lösung
Der Operator wird im Lakeflow-Designer nicht angezeigt. Überprüfen Sie, ob .user_defined_operators.yaml vorhanden ist und Ihre Funktion oder Ihren Dateipfad auflistet. Überprüfen Sie für python-run-function Operatoren den Dateipfad, und stellen Sie sicher, dass auf die YAML-Datei zugegriffen werden kann.
Die Schemaüberprüfung schlägt fehl. Überprüfen Sie Ihr YAML anhand des offiziellen Schemas unter https://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.json.
Zugriff verweigert. Überprüfen Sie bei UC-basierten Operatoren, ob Benutzer EXECUTE für die Funktion und USE SCHEMA für das Schema haben. Überprüfen Sie bei python-run-function Operatoren, ob Benutzer Lesezugriff auf die YAML-Datei haben.
python-run-function Der Operator schlägt zur Laufzeit fehl. Überprüfen Sie, ob die run() Funktionssignatur übereinstimmt def run(config, inputs, spark). Stellen Sie sicher, dass Portnamen im Code dem YAML entsprechen und dass die Rückgabewörterbuchschlüssel den Ausgabeportwerten name entsprechen.
UDTF gibt falsche Typen zurück. UDTF-Rückgabetypen müssen explizit sein . Sie können nicht auf Eingabespaltentypen verweisen.

Erlaubnisse

Erlaubnis Purpose
Lesezugriff auf .user_defined_operators.yaml. Entdecken Sie den Operator.
Lesezugriff auf die YAML-Datei (python-run-function nur). Laden Sie die Definition des Operators.
EXECUTE für die Unity Catalog-Funktion (nur UC-basierte Operatoren). Starten Sie den Operator.
USE SCHEMA auf dem Schema (nur UC-basierte Operatoren). Greifen Sie auf das Schema zu, in dem die Funktion erstellt wird.
Andere Berechtigungen Je nach Operator benötigen Benutzer möglicherweise andere Berechtigungen. Zum Beispiel USE CONNECTION für eine Unity Catalog-Verbindung für HTTP-API-Aufrufe.

Nächste Schritte

Entdecken Sie die folgenden Anleitungen:

Example Typ Description
Gmail-E-Mail-Absender python-run-function Senden von DataFrame-Daten als CSV-E-Mail-Anlage über Gmail.
Zinseszinsrechner uc-udf Berechnen zukünftiger Anlagewerte mithilfe der Zinsformel.
k-Means-Clustering uc-udtf Segmentieren von Daten in Cluster mithilfe von Scikit-Learn.
Slack-Nachricht senden uc-udf Senden von Benachrichtigungen an Slack-Kanäle über API.
Alle UI-Widgets uc-udf Referenzoperator, der alle verfügbaren UI-Widgets zeigt.

Eine vollständige Referenz zum YAML-Schema finden Sie unter YAML-Referenz für benutzerdefinierte Operatoren.