Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Important
Dieses Feature befindet sich in der Public Preview.
Mit Lakeflow Designer können Sie benutzerdefinierte Operatoren erstellen, die direkt im Zeichenbereich neben integrierten Operatoren angezeigt werden. Verwenden Sie sie, um Lakeflow Designer mit Ihrer eigenen Geschäftslogik, Berechnungen oder Integrationen zu erweitern.
Es gibt drei Arten von benutzerdefinierten Operatoren:
-
python-run-function: Eine eigenständige YAML-Datei mit Inline-Python, die im Arbeitsbereich gespeichert ist. Am besten geeignet für Transformationen auf Datenframeebene und externe Integrationen. Berechtigungen werden auf Arbeitsbereichsdateiebene verwaltet. -
uc-udf: Umschließt eine Skalarfunktion des Unity-Katalogs. Am besten geeignet für Transformationen auf Spaltenebene. Der Zugriff unterliegt den Unity-Katalogberechtigungen. -
uc-udtf: Umschließt eine Tabellenwertfunktion des Unity-Katalogs. Am besten geeignet für Transformationen auf Tabellenebene wie ML-Clustering und Aggregation. Der Zugriff unterliegt den Unity-Katalogberechtigungen.
| Funktion | python-run-function |
uc-udf |
uc-udtf |
|---|---|---|---|
| Exemplarischer Anwendungsfall | DataFrame-Transformationen, API-Integrationen, E-Mail-Benachrichtigungen | Berechnung auf Spaltenebene (BMI, Zinssätze) | ML-Clustering, zeilenübergreifende Aggregation |
| Eingabe | Datenrahmen | Einzelne Werte | Gesamte Tabelle, Zeile nach Zeile |
| Output | Datenrahmen | Einzelner Wert | Tabelle (mehrere Zeilen) |
| Erfordert Unity-Katalogfunktion | No | Yes | Yes |
| Zugriffsverwaltung | Arbeitsbereichsdateiberechtigungen | Unity-Katalogberechtigungen (EXECUTE, USE SCHEMA) |
Unity-Katalogberechtigungen (EXECUTE, USE SCHEMA) |
| Unterstützte Sprachen | Nur Python | SQL oder Python in einem SQL-Wrapper | SQL oder Python in einem SQL-Wrapper |
Wie funktionieren benutzerdefinierte Operatoren?
Ein benutzerdefinierter Operator besteht aus:
-
Operatorlogik: Der Code, der ausgeführt wird, wenn der Operator ausgeführt wird. Dies kann eine Inlinefunktion Python
run()(fürpython-run-function) oder eine Unity-Katalogfunktion (füruc-udfunduc-udtf) sein. -
YAML-Konfiguration: Teilt Lakeflow Designer mit, wie der Operator auf der Benutzeroberfläche dargestellt wird, einschließlich Name, Beschreibung, Eingabeparameter, UI-Widgets und Ports. Alle Operatortypen verwenden das
user-defined-operator-v0.1.0Schema. -
Registrierungsdatei: Ein Eintrag in
.user_defined_operators.yaml, der es Lakeflow Designer ermöglicht, den Operator zu erkennen.
Logik des Operators
Python-Run-Funktion für benutzerdefinierte Operatorlogik
Jeder python-run-function Operator muss eine run() Funktion definieren:
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
-
config: Vom Benutzer konfigurierte Werte aus der Benutzeroberfläche, schlüsselt nach Eigenschaftsname. -
inputs: Input-DataFrames, nach Eingabeportname. -
spark: Die aktive SparkSession. -
Gibt zurück: Ein Wörterbuch, das Ausgabeportwerte
nameDataFrames zuordnet.
Im folgenden Beispiel werden Zeilen aus einem Eingabedatenframe gefiltert:
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
Wenn Ihr Operator externe Pip-Pakete benötigt, fügen Sie das environment Feld zum YAML hinzu:
environment:
environment_version: '1'
dependencies:
- requests==2.31.0
- beautifulsoup4==4.12.0
UDF- und UDTF-Operatorlogik
Sie können UC-Funktionen in SQL oder Python schreiben. Python Funktionen werden in eine SQL-CREATE FUNCTION-Anweisung eingeschlossen:
SQL-Funktion:
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE SQL
RETURN
SELECT weight_kg / (height_m * height_m);
Python-Funktion (in SQL eingeschlossen):
CREATE OR REPLACE FUNCTION my_catalog.my_schema.calculate_bmi(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
return weight_kg / (height_m ** 2)
$$;
UDFs verarbeiten jeweils einen einzelnen Wert und geben einen berechneten Wert zurück. UDTFs verarbeiten Tabellenzeile nach Zeile und können den Zustand über alle Zeilen hinweg verwalten. Verwenden Sie uc-udf für Transformationen auf Spaltenebene und uc-udtf für Operationen wie ML-Clustering oder Aggregation.
Darüber hinaus müssen Sie mit UDTFs drei Schlüsselmethoden definieren: __init__(), , eval()und terminate():
class MyOperator:
def __init__(self):
# Called before processing - initialize any values needed.
def eval(self, row, id_column, columns, k):
# Called once per input row - accumulate data here.
def terminate(self):
# Called after all rows - perform final calculations and yield results.
Note
UDTF-Rückgabetabellen müssen feste, explizite Typen aufweisen. Sie können in der Rückgabekonfiguration nicht auf Eingabespaltentypen verweisen.
YAML-Konfiguration
Die YAML-Konfiguration teilt Lakeflow Designer mit, wie der Operator auf der Benutzeroberfläche dargestellt wird. Er definiert den Namen, die Beschreibung, eingabeparameter, UI-Widgets und Ports des Operators. Jedes Konfigurationsfeld ist eine Eigenschaft mit einem Typ, Titel und optionalen x-ui Widgethinweisen:
config:
type: object
properties:
my_param:
type: string
title: My Parameter
x-ui:
widget: input
my_expression:
type: string
title: Column
format: expression
x-ui:
widget: expression
port: in
my_number:
type: number
title: Count
default: 10
minimum: 0
maximum: 100
required:
- my_param
- my_expression
Ausführliche Informationen zum YAML-Schema, einschließlich aller Widgettypen und Konfigurationsoptionen, finden Sie in der YAML-Referenz zum benutzerdefinierten Operator.
Häfen
Ports definieren die Eingaben und Ausgaben für Ihren Operator:
ports:
input:
- name: in
title: Input Data
mime: application/vnd.databricks.dataframe
required: true
allowMultiple: false
output:
- name: out
title: Output Data
YAML für Python-Operatoren für Ausführungsfunktionen
Für python-run-function-Operatoren ist die YAML-Datei eigenständig und enthält ein run_function-Feld mit Inline-Python Code:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Filter Rows
id: filter_rows
version: '1.0.0'
description: Filters rows based on a SQL expression.
config:
type: object
properties:
filter_expression:
type: string
title: Filter Expression
x-ui:
widget: input
required:
- filter_expression
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
def run(config, inputs, spark):
df = inputs["in"]
filtered = df.filter(config["filter_expression"])
return {"out": filtered}
YAML für Unity-Katalogfunktionen
Betten Sie für UC-basierte Operatoren die YAML-Konfiguration als Kommentar oder Docstring in Ihre Funktion ein.
In SQL (Kommentar verwenden /* ... */ ):
RETURN(/*
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
*/
SELECT weight_kg / (height_m * height_m)
);
In Python (verwenden Sie """ ... """ docstring):
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Calculate BMI
id: calculate_bmi
version: "1.0.0"
description: Calculates BMI from weight and height.
config:
type: object
properties:
weight_kg:
type: string
title: Weight (in kg)
format: expression
x-ui:
widget: expression
port: in
height_m:
type: string
title: Height (in meters)
format: expression
x-ui:
widget: expression
port: in
required:
- weight_kg
- height_m
ports:
input:
- name: in
title: Input Data
output:
- name: out
title: Output
"""
return weight_kg / (height_m ** 2)
$$;
Registrieren und Bereitstellen Ihres Operators im Lakeflow Designer
Damit Ihr Operator im Lakeflow Designer angezeigt wird, registrieren Sie ihn in einer .user_defined_operators.yaml Datei:
- Arbeitsbereichsebene: Platzieren Sie die Datei im Stammverzeichnis Ihres Arbeitsbereichs, um den Operator für alle Benutzer sichtbar zu machen.
-
Benutzerebene: Platzieren Sie die Datei in Ihrem Benutzerstartordner (
/Workspace/Users/<user-name>/.user_defined_operators.yaml), damit Operatoren nur für Sie sichtbar sind.
Der operators: Abschnitt unterstützt Dateipfade, Unity Catalog-Funktionsverweise und Globmuster. Sie können Eintragstypen kombinieren:
operators:
# File path (python-run-function operators)
- /Workspace/Users/me/udos/my_operator.yaml
# Glob pattern (registers all matching files)
- /Workspace/Users/me/udos/transforms/*.yaml
# UC function reference (uc-udf and uc-udtf operators)
- catalog: my_catalog
schema: my_schema
functionName: my_function
Erweiterte Konfigurationen
Vorschaumodus
Lakeflow Designer unterstützt Vorschauen im Entwurfsmodus. Fügen Sie für Operatoren, die externe APIs aufrufen oder in externe Systeme schreiben, eine is_preview Konfigurationseigenschaft hinzu, damit Sie während der Vorschau Nebenwirkungen überspringen können. Wenn der Vorschaumodus aktiviert ist, müssen Benutzer explizit auf "Ausführen" klicken, um den Operator mit Nebenwirkungen auszuführen.
config:
type: object
properties:
is_preview:
type: boolean
format: is_preview
default: false
Lakeflow Designer setzt diesen Wert während der Vorschau automatisch auf true. Berücksichtigen Sie dies in Ihrer Logik, um Seiteneffekte zu vermeiden:
# In a python-run-function
if config.get("is_preview"):
return {"out": inputs["in"]}
# In a UC function (SQL)
CASE WHEN is_preview THEN 'preview' ELSE /* actual work */ END
Unity-Katalogverbindungen
Verwenden Sie für UC-basierte SQL-Operatoren, die externe APIs aufrufen, Unity Catalog HTTP-Verbindungen, um Anmeldeinformationen sicher zu speichern:
CREATE CONNECTION my_api_connection TYPE HTTP OPTIONS (
host 'https://api.example.com',
port '443',
base_path '/v1/',
bearer_token 'your-token-here'
);
Verwenden Sie dann die Verbindung in Ihrer SQL-UDF mit der http_request() Funktion. Ausführliche Informationen finden Sie unter Herstellen einer Verbindung mit externen HTTP-Diensten.
WorkspaceClient
Für python-run-function-Operatoren können Sie die Azure Databricks WorkspaceClient verwenden, um auf Arbeitsbereichsressourcen und externe APIs zuzugreifen:
def run(config, inputs, spark):
from databricks.sdk import WorkspaceClient
w = WorkspaceClient()
# Use w to access workspace resources
Erstellen Sie einen vollständigen benutzerdefinierten Operator „python-run-function“
Die folgenden Schritte führen Sie durch das Erstellen eines python-run-function Operators von Grund auf.
Schritt 1: Definieren der Logik
Schreiben Sie Ihre run() Funktion in einem Notizbuch:
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
Schritt 2: Testen der Funktion
Testen Sie die Funktion interaktiv mit Beispieldaten:
test_df = spark.createDataFrame(
[("Alice", 100), ("Bob", 200)],
["name", "amount"]
)
result = run(
config={"column_name": "processed_at"},
inputs={"in": test_df},
spark=spark
)
result["out"].show()
Schritt 3: Erstellen der YAML-Konfiguration
Definieren Sie die Operatormetadaten, Konfigurationsfelder und Ports in einer YAML-Datei:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
Schritt 4: Kombinieren der Logik und YAML
Fügen Sie die Felder run_function und ports hinzu, um die vollständige YAML-Datei zu erstellen. Speichern Sie es in Ihrem Arbeitsbereich, z. B. /Workspace/Users/<user-name>/udos/add_timestamp.yaml:
schema: user-defined-operator-v0.1.0
type: python-run-function
name: Add Timestamp
id: transforms.add_timestamp
version: '1.0.0'
description: Adds a timestamp column to the input DataFrame.
config:
type: object
properties:
column_name:
type: string
title: Column Name
default: processed_at
x-ui:
widget: input
required:
- column_name
ports:
input:
- name: in
title: Input
output:
- name: out
title: Output
run_function:
type: inline
code: |
from typing import Dict, Any
def run(config: Dict[str, Any], inputs: Dict[str, Any], spark) -> Dict[str, Any]:
from pyspark.sql import functions as F
df = inputs["in"]
result = df.withColumn(config["column_name"], F.current_timestamp())
return {"out": result}
Schritt 5: Registrieren des Operators
Fügen Sie den Dateipfad zu Ihrer .user_defined_operators.yaml Datei hinzu:
operators:
- /Workspace/Users/<user-name>/udos/add_timestamp.yaml
Schritt 6: Verwenden des Operators in Lakeflow Designer
Öffnen Sie Lakeflow Designer, und überprüfen Sie, ob der Operator in der Operatorpalette angezeigt wird. Ziehen Sie es auf die Arbeitsfläche, verbinden Sie einen Eingang, konfigurieren Sie den Spaltennamen und starten Sie eine Vorschau.
Erstellen Sie einen vollständigen benutzerdefinierten UC-Operator
Die folgenden Schritte führen Sie durch das Erstellen eines UC-basierten uc-udf Operators.
Schritt 1: Definieren der Logik
Schreiben und Testen der Funktionslogik in einem Notizbuch:
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
Schritt 2: Erstellen der YAML-Konfiguration
Definieren Sie die Operatormetadaten, Konfigurationsfelder und Ports:
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: '1.0.0'
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
Schritt 3: Kombinieren der Logik und YAML
Erstellen Sie die Unity-Katalogfunktion mit dem yaML, das als Dokumentzeichenfolge eingebettet ist:
CREATE OR REPLACE FUNCTION main.my_schema.double_value(input_value DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
AS $$
"""
schema: user-defined-operator-v0.1.0
type: uc-udf
name: Double Value
id: math.double_value
version: "1.0.0"
description: Doubles the input value
config:
type: object
properties:
input_value:
type: string
title: Input Value
format: expression
x-ui:
widget: expression
port: input_data
required:
- input_value
ports:
input:
- name: input_data
title: Input
output:
- name: out
title: Output
"""
def double_value(input_value: float) -> float:
if input_value is None:
return None
return input_value * 2
return double_value(input_value)
$$
Schritt 4: Testen der Funktion
SELECT main.my_schema.double_value(5) AS result;
-- Should return: 10
Schritt 5: Registrieren des Operators
Fügen Sie der Datei den Unity-Katalog-Funktionsverweis hinzu .user_defined_operators.yaml :
operators:
- catalog: main
schema: my_schema
functionName: double_value
Schritt 6: Verwenden des Operators in Lakeflow Designer
Öffnen Sie Lakeflow Designer, und überprüfen Sie, ob der Operator in der Operatorpalette angezeigt wird. Ziehen Sie es auf die Arbeitsfläche, schließen Sie einen Eingang an und führen Sie die Vorschau aus.
Troubleshooting
| Issue | Lösung |
|---|---|
| Der Operator wird im Lakeflow-Designer nicht angezeigt. | Überprüfen Sie, ob .user_defined_operators.yaml vorhanden ist und Ihre Funktion oder Ihren Dateipfad auflistet. Überprüfen Sie für python-run-function Operatoren den Dateipfad, und stellen Sie sicher, dass auf die YAML-Datei zugegriffen werden kann. |
| Die Schemaüberprüfung schlägt fehl. | Überprüfen Sie Ihr YAML anhand des offiziellen Schemas unter https://your-workspace.cloud.databricks.com/static/schemas/user-defined-operator-v0.1.0.json. |
| Zugriff verweigert. | Überprüfen Sie bei UC-basierten Operatoren, ob Benutzer EXECUTE für die Funktion und USE SCHEMA für das Schema haben. Überprüfen Sie bei python-run-function Operatoren, ob Benutzer Lesezugriff auf die YAML-Datei haben. |
python-run-function Der Operator schlägt zur Laufzeit fehl. |
Überprüfen Sie, ob die run() Funktionssignatur übereinstimmt def run(config, inputs, spark). Stellen Sie sicher, dass Portnamen im Code dem YAML entsprechen und dass die Rückgabewörterbuchschlüssel den Ausgabeportwerten name entsprechen. |
| UDTF gibt falsche Typen zurück. | UDTF-Rückgabetypen müssen explizit sein . Sie können nicht auf Eingabespaltentypen verweisen. |
Erlaubnisse
| Erlaubnis | Purpose |
|---|---|
Lesezugriff auf .user_defined_operators.yaml. |
Entdecken Sie den Operator. |
Lesezugriff auf die YAML-Datei (python-run-function nur). |
Laden Sie die Definition des Operators. |
| EXECUTE für die Unity Catalog-Funktion (nur UC-basierte Operatoren). | Starten Sie den Operator. |
| USE SCHEMA auf dem Schema (nur UC-basierte Operatoren). | Greifen Sie auf das Schema zu, in dem die Funktion erstellt wird. |
| Andere Berechtigungen | Je nach Operator benötigen Benutzer möglicherweise andere Berechtigungen. Zum Beispiel USE CONNECTION für eine Unity Catalog-Verbindung für HTTP-API-Aufrufe. |
Nächste Schritte
Entdecken Sie die folgenden Anleitungen:
| Example | Typ | Description |
|---|---|---|
| Gmail-E-Mail-Absender | python-run-function |
Senden von DataFrame-Daten als CSV-E-Mail-Anlage über Gmail. |
| Zinseszinsrechner | uc-udf |
Berechnen zukünftiger Anlagewerte mithilfe der Zinsformel. |
| k-Means-Clustering | uc-udtf |
Segmentieren von Daten in Cluster mithilfe von Scikit-Learn. |
| Slack-Nachricht senden | uc-udf |
Senden von Benachrichtigungen an Slack-Kanäle über API. |
| Alle UI-Widgets | uc-udf |
Referenzoperator, der alle verfügbaren UI-Widgets zeigt. |
Eine vollständige Referenz zum YAML-Schema finden Sie unter YAML-Referenz für benutzerdefinierte Operatoren.