Co to są funkcje zdefiniowane przez użytkownika (UDF)?
Funkcje zdefiniowane przez użytkownika (UDF) umożliwiają ponowne używanie i udostępnianie kodu rozszerzającego wbudowane funkcje usługi Azure Databricks. Funkcje zdefiniowane przez użytkownika umożliwiają wykonywanie określonych zadań, takich jak złożone obliczenia, przekształcenia lub niestandardowe manipulowanie danymi.
Uwaga
W klastrach z trybem dostępu współużytkowanego funkcje zdefiniowane przez użytkownika języka Python są obsługiwane w środowisku Databricks Runtime 13.3 LTS i nowszym, natomiast funkcje zdefiniowane przez użytkownika języka Scala są obsługiwane w środowisku Databricks Runtime 14.2 lub nowszym.
Skalarne funkcje zdefiniowane przez użytkownika języka Python można zarejestrować w katalogu aparatu Unity przy użyciu składni SQL w środowisku Databricks Runtime 13.3 LTS lub nowszym. Zobacz Funkcje zdefiniowane przez użytkownika (UDF) w wykazie aparatu Unity.
Kiedy należy użyć funkcji zdefiniowanej przez użytkownika?
Używaj funkcji zdefiniowanych przez użytkownika dla logiki, która jest trudna do wyrażenia za pomocą wbudowanych funkcji platformy Apache Spark. Wbudowane funkcje platformy Apache Spark są zoptymalizowane pod kątem przetwarzania rozproszonego i ogólnie zapewniają lepszą wydajność na dużą skalę. Aby uzyskać więcej informacji, zobacz Funkcje.
Usługa Databricks zaleca funkcje zdefiniowane przez użytkownika na potrzeby zapytań ad hoc, czyszczenia danych ręcznych, eksploracyjnej analizy danych i operacji na małych i średnich zestawach danych. Typowe przypadki użycia funkcji zdefiniowanych przez użytkownika obejmują szyfrowanie i odszyfrowywanie danych, skróty, analizowanie danych JSON i walidację.
Użyj metod platformy Apache Spark na potrzeby operacji na bardzo dużych zestawach danych i wszystkich obciążeniach, które są uruchamiane regularnie lub stale, w tym zadania ETL i operacje przesyłania strumieniowego.
Zarejestrowane i ograniczone funkcje zdefiniowane przez użytkownika w zakresie sesji
Funkcje zdefiniowane przez użytkownika utworzone przy użyciu języka SQL są zarejestrowane w wykazie aparatu Unity i mają skojarzone uprawnienia, natomiast funkcje zdefiniowane przez użytkownika utworzone w notesie są oparte na sesji i są ograniczone do bieżącej platformy SparkSession.
Funkcje zdefiniowane przez użytkownika oparte na sesji i uzyskiwanie do ich dostępu można definiować przy użyciu dowolnego języka obsługiwanego przez usługę Azure Databricks. Funkcje zdefiniowane przez użytkownika mogą być skalarne lub nie skalarne.
Uwaga
Obecnie w bazie danych DBSQL są dostępne tylko funkcje zdefiniowane przez użytkownika sql i Python zarejestrowane w katalogu aparatu Unity.
Skalarne funkcje zdefiniowane przez użytkownika
Skalarne funkcje zdefiniowane przez użytkownika działają w jednym wierszu i zwracają pojedynczą wartość dla każdego wiersza. W poniższym przykładzie użyto skalarnej funkcji zdefiniowanej przez użytkownika do obliczenia długości każdej nazwy w name
kolumnie i dodania wartości w nowej kolumnie name_length
:
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
Aby zaimplementować to w notesie usługi Databricks przy użyciu narzędzia PySpark:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
Aby uzyskać więcej informacji, zobacz Funkcje zdefiniowane przez użytkownika (UDF) w katalogu aparatu Unity i funkcje skalarne zdefiniowane przez użytkownika — Python.
Funkcje agregujące zdefiniowane przez użytkownika (UDAF)
Funkcje agregujące zdefiniowane przez użytkownika (UDAFs) działają na wielu wierszach i zwracają jeden zagregowany wynik. W poniższym przykładzie zdefiniowano interfejs UDAF, który agreguje wyniki.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
Zobacz funkcje zdefiniowane przez użytkownika biblioteki pandas dla języka Python i funkcji agregujących zdefiniowanych przez użytkownika — Scala.
Funkcje tabeli zdefiniowane przez użytkownika w języku Python (UDF)
Ważne
Ta funkcja jest dostępna w publicznej wersji zapoznawczej.
Uwaga
Funkcje zdefiniowane przez użytkownika języka Python są dostępne w środowisku Databricks Runtime 14.3 LTS i nowszym.
Funkcje tabeli zdefiniowane przez użytkownika języka Python (UDTFs) mogą zwracać wiele wierszy i kolumn dla każdego wiersza wejściowego. W poniższym przykładzie każda wartość w kolumnie score odpowiada liście kategorii. Definiowana jest funkcja UDTF, aby podzielić listę rozdzielaną przecinkami na wiele wierszy. Zobacz Funkcje tabeli zdefiniowane przez użytkownika w języku Python (UDF)
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="score: int, categories: string, name: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: float, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
# Apply the UDTF
result_df = df.select(ScoreCategoriesUDTF(df.score, df.categories, df.name))
display(result_df)
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
Zagadnienia dotyczące wydajności
- Wbudowane funkcje i funkcje zdefiniowane przez użytkownika SQL są najbardziej wydajną opcją.
- Funkcje zdefiniowane przez użytkownika języka Scala są zwykle szybsze, ponieważ są wykonywane na maszynie wirtualnej Java (JVM) i unikają obciążeń związanych z przenoszeniem danych do i z maszyny wirtualnej JVM.
- Funkcje zdefiniowane przez użytkownika języka Python i funkcje zdefiniowane przez użytkownika biblioteki Pandas są zwykle wolniejsze niż funkcje UDF języka Scala, ponieważ wymagają serializacji i przeniesienia danych z maszyny wirtualnej JVM do interpretera języka Python. Funkcje zdefiniowane przez użytkownika biblioteki Pandas o szybkości do 100 razy szybsze niż funkcje zdefiniowane przez użytkownika języka Python, ponieważ używają narzędzia Apache Arrow w celu zmniejszenia kosztów serializacji.