Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Mit benutzerdefinierten Funktionen (USER-Defined Functions, UDFs) können Sie Code wiederverwenden und freigeben, der integrierte Funktionen in Azure Databricks erweitert. Verwenden Sie UDFs, um bestimmte Aufgaben wie komplexe Berechnungen, Transformationen oder benutzerdefinierte Datenmanipulationen auszuführen.
Wann soll eine UDF- und Apache Spark-Funktion verwendet werden?
Verwenden Sie UDFs für Logik, die mit integrierten Apache Spark-Funktionen schwer ausgedrückt werden kann. Integrierte Apache Spark-Funktionen sind für die verteilte Verarbeitung optimiert und gewährleisten eine bessere Leistung im großen Maßstab. Weitere Informationen finden Sie unter Funktionen.
Databricks empfiehlt UDFs für Ad-hoc-Abfragen, manuelle Datenbereinigung, explorative Datenanalyse und Vorgänge für kleine bis mittlere Datasets. Häufige Anwendungsfälle für UDFs sind Datenverschlüsselung, Entschlüsselung, Hashing, JSON-Analyse und Validierung.
Verwenden Sie Apache Spark-Methoden für Vorgänge auf sehr großen Datasets und alle Workloads werden regelmäßig oder kontinuierlich ausgeführt, einschließlich ETL-Aufträgen und Streamingvorgängen.
Grundlegendes zu UDF-Typen
Wählen Sie einen UDF-Typ auf den folgenden Registerkarten aus, um eine Beschreibung, ein Beispiel, und einen Link anzuzeigen, um mehr zu erfahren.
Skalare UDF
Skalare UDFs werden für eine einzelne Zeile ausgeführt und geben einen einzelnen Ergebniswert für jede Zeile zurück. Sie können von Unity Catalog verwaltet werden oder sitzungsbezogen sein.
Im folgenden Beispiel wird eine skalare UDF verwendet, um die Länge der einzelnen Namen in einer name
Spalte zu berechnen und den Wert in einer neuen Spalte name_length
hinzuzufügen.
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
So implementieren Sie dies in einem Databricks-Notizbuch mithilfe von PySpark:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
Siehe benutzerdefinierte Funktionen (USER-Defined Functions, UDFs) im Unity-Katalog und benutzerdefinierte Skalarfunktionen – Python.
Batchskalare UDFs
Verarbeiten von Daten in Batches bei gleichzeitiger Beibehaltung der 1:1-Eingabe-/Ausgabezeilenparität. Dadurch wird der Aufwand für Zeilen-nach-Zeilen-Vorgänge für die Verarbeitung großer Datenmengen reduziert. Batch-UDFs behalten auch den Zustand zwischen Batches bei, um effizienter ausgeführt zu werden, Ressourcen wiederzuverwenden und komplexe Berechnungen zu verarbeiten, die Kontext über Datenblöcke hinweg benötigen.
Sie können von Unity Catalog verwaltet werden oder sitzungsbezogen sein.
Der folgende Batch Unity Catalog Python UDF berechnet den BMI während der Verarbeitung von Reihenstapeln.
+-------------+-------------+
| weight_kg | height_m |
+-------------+-------------+
| 90 | 1.8 |
| 77 | 1.6 |
| 50 | 1.5 |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
| BMI |
+--------+
| 27.8 |
| 30.1 |
| 22.2 |
+--------+
Siehe benutzerdefinierte Funktionen (USER-Defined Functions, UDFs) in Unity Catalog und Batch Python User-defined functions (UDFs) im Unity-Katalog.
Nicht skalare UDFs
Nicht-skalare UDFs verarbeiten vollständige Datensätze/Spalten mit flexiblen Eingabe-/Ausgabeverhältnissen (1:N oder viele:viele).
Sitzungsbezogene Batch-Pandas-UDFs können die folgenden Typen haben:
- Datenreihe zu Datenreihe
- Iterator der Datenreihe zu Iterator der Datenreihe
- Iterator von mehreren Datenreihen zu Iterator der Datenreihe
- Datenreihe zu skalar
Im Folgenden sehen Sie ein Beispiel für ein Pandas-UDF des Typs „Datenreihe zu Datenreihe“.
from pyspark.sql.functions import pandas_udf
import pandas as pd
df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])
@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
return weight / (height ** 2)
df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()
Siehe benutzerdefinierte Pandas-Funktionen.
UDAF
UDAFs arbeiten mit mehreren Zeilen und geben ein einzelnes aggregiertes Ergebnis zurück. UDAFs sind nur sitzungsbezogen.
Im folgenden UDAF-Beispiel werden die Bewertungen anhand der Namenslänge aggregiert.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
Siehe pandas benutzerdefinierte Funktionen für Python und Benutzerdefinierte Aggregatfunktionen - Scala.
Benutzerdefinierte Tabellenfunktionen (UDTFs)
Ein UDTF akzeptiert ein oder mehrere Eingabeargumente und gibt für jede Eingabezeile mehrere Zeilen (und möglicherweise mehrere Spalten) zurück. UDTFs sind nur sitzungsbezogen.
Im folgenden Beispiel entspricht jeder Wert in der Scorespalte einer Liste von Kategorien. Die UDTF teilt die durch Trennzeichen getrennte Liste in mehrere Zeilen auf.
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="name: string, score: int, category: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: int, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
ScoreCategoriesUDTF(lit("Alice"), lit(85), lit("Math,Science,English")).display()
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
Siehe Benutzerdefinierte Tabellenfunktionen (User-Defined Table Functions, UDTFs) in Python.
Von Unity Catalog verwaltete UDFs im Vergleich mit sitzungsbezogenen UDFs
Unity Catalog Python UDFs und Batch Unity Catalog Python UDFs werden im Unity Catalog gespeichert, um die Governance, Wiederverwendung und Auffindbarkeit zu verbessern. Alle anderen UDFs sind sitzungsbasiert, d. h., sie werden in einem Notizbuch oder Auftrag definiert und sind auf die aktuelle SparkSession-Sitzung bezogen. Sie können UDFs mit Sitzungsbereich mithilfe von Scala oder Python definieren und auf sie zugreifen.
Spickzettel zu von Unity Catalog verwalteten UDFs
Durch den Unity-Katalog geregelte UDFs können benutzerdefinierte Funktionen definiert, verwendet, sicher freigegeben und in computerübergreifenden Umgebungen gesteuert werden. Weitere Informationen finden Sie unter User-defined functions (UDFs) in Unity Catalog (Benutzerdefinierte Funktionen (UDFs) in Unity Catalog).
UDF-Typ | Unterstütztes Compute | BESCHREIBUNG |
---|---|---|
Unity Catalog Python UDF |
|
Definieren Sie eine UDF in Python, und registrieren Sie sie im Unity-Katalog für Governance. Skalare UDFs werden für eine einzelne Zeile ausgeführt und geben einen einzelnen Ergebniswert für jede Zeile zurück. |
Batch-UDF Unity Catalog/Python |
|
Definieren Sie eine UDF in Python, und registrieren Sie sie im Unity-Katalog für Governance. Batchvorgänge für mehrere Werte und Zurückgeben mehrerer Werte. Reduziert den Aufwand von Vorgängen, die Zeile für Zeile ausgeführt werden, bei der umfangreichen Datenverarbeitung. |
Spickzettel zu sitzungsbezogenen UDFs für benutzerisoliertes Compute
Sitzungsbezogene UDFs werden in einem Notizbuch oder Auftrag definiert und sind auf die aktuelle SparkSession-Sitzung bezogen. Sie können UDFs mit Sitzungsbereich mithilfe von Scala oder Python definieren und auf sie zugreifen.
UDF-Typ | Unterstütztes Compute | BESCHREIBUNG |
---|---|---|
Python, skalar |
|
Skalare UDFs werden für eine einzelne Zeile ausgeführt und geben einen einzelnen Ergebniswert für jede Zeile zurück. |
Python nicht skalar |
|
Nicht skalare UDFs enthalten pandas_udf , mapInPandas , mapInArrow , applyInPandas . Pandas-UDFs verwenden Apache Arrow, um Daten und Pandas zu übertragen, um mit den Daten zu arbeiten. Pandas UDFs unterstützen vektorisierte Vorgänge, die die Leistung über zeilenweise skalare UDFs erheblich steigern können. |
Python-UDTFs |
|
Ein UDTF akzeptiert ein oder mehrere Eingabeargumente und gibt für jede Eingabezeile mehrere Zeilen (und möglicherweise mehrere Spalten) zurück. |
Skalare Scala-UDFs |
|
Skalare UDFs werden für eine einzelne Zeile ausgeführt und geben einen einzelnen Ergebniswert für jede Zeile zurück. |
Scala-UDAFs |
|
UDAFs arbeiten mit mehreren Zeilen und geben ein einzelnes aggregiertes Ergebnis zurück. |
Leistungsüberlegungen
Integrierte Funktionen und SQL UDFs sind die effizientesten Optionen.
Scala UDFs sind im Allgemeinen schneller als Python UDFs.
- Nicht isolierte Scala UDFs werden auf dem virtuellen Java-Computer (JVM) ausgeführt, sodass sie den Aufwand für das Verschieben von Daten in und aus dem JVM vermeiden.
- Isolierte Scala UDFs müssen Daten in und aus dem JVM verschieben, aber sie können immer noch schneller sein als Python-UDFs, da sie den Speicher effizienter verarbeiten.
Python UDFs und Pandas UDFs neigen dazu, langsamer zu sein als Scala UDFs, da sie Daten serialisieren und aus dem JVM in den Python-Dolmetscher verschoben werden müssen.
- Pandas UDFs sind bis zu 100x schneller als Python UDFs, da sie Apache Arrow verwenden, um die Serialisierungskosten zu reduzieren.