Wat zijn door de gebruiker gedefinieerde functies (UDF's)?
Met door de gebruiker gedefinieerde functies (UDF's) kunt u code hergebruiken en delen die de ingebouwde functionaliteit op Azure Databricks uitbreidt. UDF's gebruiken om specifieke taken uit te voeren, zoals complexe berekeningen, transformaties of aangepaste gegevensbewerkingen.
Notitie
Op clusters met de modus voor gedeelde toegang worden Scalaire UDF's van Python ondersteund in Databricks Runtime 13.3 LTS en hoger, terwijl Scala UDF's worden ondersteund in Databricks Runtime 14.2 en hoger.
Python scalaire UDF's kunnen worden geregistreerd in Unity Catalog met behulp van SQL-syntaxis in Databricks Runtime 13.3 LTS en hoger. Zie door de gebruiker gedefinieerde functies (UDF's) in Unity Catalog.
Wanneer moet u een UDF gebruiken?
UDF's gebruiken voor logica die moeilijk te uitdrukken is met ingebouwde Apache Spark-functies. Ingebouwde Apache Spark-functies zijn geoptimaliseerd voor gedistribueerde verwerking en bieden over het algemeen betere prestaties op schaal. Zie Functions voor meer informatie.
Databricks raadt UDF's aan voor ad-hocquery's, handmatige gegevensopschoning, verkennende gegevensanalyse en bewerkingen voor kleine tot middelgrote gegevenssets. Veelvoorkomende gebruiksvoorbeelden voor UDF's zijn onder andere gegevensversleuteling en ontsleuteling, hashing, JSON-parsering en validatie.
Gebruik Apache Spark-methoden voor bewerkingen op zeer grote gegevenssets en workloads die regelmatig of continu worden uitgevoerd, waaronder ETL-taken en streamingbewerkingen.
Geregistreerde en sessie-scoped UDF's
UDF's die zijn gemaakt met SQL, worden geregistreerd in Unity Catalog en hebben bijbehorende machtigingen, terwijl UDF's die in uw notebook zijn gemaakt, sessiegebaseerd zijn en binnen het bereik van de huidige SparkSession vallen.
U kunt UDF's op basis van sessies definiëren en openen met behulp van elke taal die wordt ondersteund door Azure Databricks. UDF's kunnen scalaire of niet-scalaire zijn.
Notitie
Momenteel zijn alleen SQL- en Python scalaire UDF's die zijn geregistreerd in Unity Catalog beschikbaar in DBSQL.
Scalaire UDF's
Scalaire UDF's worden uitgevoerd op één rij en retourneren één waarde voor elke rij. In het volgende voorbeeld wordt een scalaire UDF gebruikt om de lengte van elke naam in een name
kolom te berekenen en de waarde in een nieuwe kolom name_length
toe te voegen:
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
Ga als volgt te werk om dit te implementeren in een Databricks-notebook met behulp van PySpark:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
Zie Door de gebruiker gedefinieerde functies (UDF's) in Unity Catalog en door de gebruiker gedefinieerde scalaire functies - Python voor meer informatie.
Door de gebruiker gedefinieerde statistische functies (UDAF's)
Door de gebruiker gedefinieerde statistische functies (UDAF's) werken op meerdere rijen en retourneren één samengevoegd resultaat. In het volgende voorbeeld wordt een UDAF gedefinieerd waarmee scores worden geaggregeerd.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
Zie door de gebruiker gedefinieerde pandas-functies voor Door de gebruiker gedefinieerde statistische functies voor Python en door de gebruiker gedefinieerde statistische functies - Scala.
Door de gebruiker gedefinieerde Python-tabelfuncties (UDF's)
Belangrijk
Deze functie is beschikbaar als openbare preview.
Notitie
Python UDFS zijn beschikbaar in Databricks Runtime 14.3 LTS en hoger.
Door de gebruiker gedefinieerde tabelfuncties (UDF's) van Python kunnen meerdere rijen en kolommen retourneren voor elke invoerrij. In het volgende voorbeeld komt elke waarde in de scorekolom overeen met een lijst met categorieën. Een UDTF wordt gedefinieerd om de door komma's gescheiden lijst in meerdere rijen te splitsen. Zie door de gebruiker gedefinieerde Tabelfuncties van Python (UDDF's)
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="score: int, categories: string, name: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: float, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
# Apply the UDTF
result_df = df.select(ScoreCategoriesUDTF(df.score, df.categories, df.name))
display(result_df)
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
Prestatieoverwegingen
- Ingebouwde functies en SQL UDF's zijn de meest efficiënte optie.
- Scala UDF's zijn over het algemeen sneller wanneer ze worden uitgevoerd binnen de JVM (Java Virtual Machine) en voorkomen dat de overhead van het verplaatsen van gegevens naar en van de JVM wordt vermeden.
- Python UDF's en Pandas UDF's zijn meestal langzamer dan Scala UDF's, omdat ze vereisen dat gegevens worden geserialiseerd en verplaatst van de JVM naar de Python-interpreter. Pandas UDF's tot 100x sneller dan Python UDF's omdat ze Apache Arrow gebruiken om serialisatiekosten te verlagen.