Poznámka
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Uživatelem definované funkce (UDF) umožňují opakovaně používat a sdílet kód, který rozšiřuje integrované funkce v Azure Databricks. Funkce definované uživatelem (UDFs) slouží k provádění konkrétních úloh, jako jsou složité výpočty, transformace nebo vlastní manipulace s daty.
Kdy použít funkci UDF vs. Apache Spark?
UDF používejte pro logiku, která se obtížně vyjadřuje pomocí integrovaných funkcí Apache Sparku. Integrované funkce Apache Sparku jsou optimalizované pro distribuované zpracování a nabízejí lepší výkon ve velkém měřítku. Další informace naleznete v tématu Functions.
Databricks doporučuje UDF pro ad hoc dotazy, manuální čištění dat, průzkumnou analýzu dat a operace s malými až středně velkými datovými sadami. Mezi běžné případy použití uživatelsky definovaných funkcí patří šifrování dat, dešifrování, hashování, analýza JSON a ověřování.
Použijte metody Apache Sparku pro operace s velmi velkými datovými sadami a všechny úlohy běží pravidelně nebo nepřetržitě, včetně úloh ETL a operací streamování.
Porozumět typům uživatelem definovaným funkcím
Na následujících kartách vyberte typ UDF pro zobrazení popisu, příkladu a odkazu pro více informací.
Skalární UDF
Skalární UDF pracují na jednom řádku a vracejí jednu výslednou hodnotu pro každý řádek. Mohou být řízeny pomocí katalogu Unity nebo zaměřeny na relaci.
Následující příklad používá skalární UDF k výpočtu délky každého názvu ve name
sloupci a přidání hodnoty do nového sloupce name_length
.
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
K provedení v poznámkovém bloku Databricks pomocí PySpark:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
Viz uživatelem definované funkce (UDF) v katalogu Unity a uživatelem definované skalární funkce – Python.
Skalární uživatele služby Batch
Zpracovávání dat v dávkách se zachováním rovnosti poměru 1:1 mezi vstupními a výstupními řádky. Tím se sníží náročnost operací zpracovávaných po jednotlivých řádcích pro zpracování velkých objemů dat. Uživatelsky definované funkce Batch také udržují stav mezi dávkovými zpracováními, aby se spouštěly efektivněji, opakovaně používaly prostředky a zpracovaly složité výpočty, které potřebují kontext napříč bloky.
Mohou být řízeny pomocí katalogu Unity nebo zaměřeny na relaci.
Následující funkce Batch Unity Catalog Python UDF vypočítá BMI při zpracování šarží řádků.
+-------------+-------------+
| weight_kg | height_m |
+-------------+-------------+
| 90 | 1.8 |
| 77 | 1.6 |
| 50 | 1.5 |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
| BMI |
+--------+
| 27.8 |
| 30.1 |
| 22.2 |
+--------+
Viz uživatelem definované funkce (UDF) v katalogu Unity a uživatelem definované funkce Pythonu (UDF) v katalogu Unity.
Jiné než skalární funkce definované uživatelem
Nes skalární funkce definované uživatelem pracují s celými datovými sadami a sloupci s flexibilními vstupními a výstupními poměry (1:N nebo N:N).
Uživatelem definované uživatelem knihovny pandas v rámci relace můžou být následující typy:
- Serie na serii
- Iterátor řady k iterátoru řad
- Iterátor více sérií na iterátor série
- Ze série na skalár
Následuje příklad pandas UDF typu Series na Series.
from pyspark.sql.functions import pandas_udf
import pandas as pd
df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])
@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
return weight / (height ** 2)
df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()
Vizte funkce pandas definované uživatelem.
UDAF
UDAF pracují s více řádky a vrací jeden agregovaný výsledek. Funkce UDAFs jsou omezeny pouze na relaci.
Následující příklad UDAF agreguje skóre podle délky názvu.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
Viz uživatelem definované funkce v pandas pro Python a agregační funkce definované uživatelem ve Scale.
UDTF
UDTF přebírá jeden nebo více vstupních argumentů a vrací více řádků (a případně více sloupců) pro každý vstupní řádek. Uživatelem definované tabulkové funkce (UDTF) jsou omezené pouze na relaci.
V následujícím příkladu každá hodnota ve sloupci skóre odpovídá seznamu kategorií. UDTF rozdělí seznam oddělený čárkami na několik řádků.
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="name: string, score: int, category: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: int, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
ScoreCategoriesUDTF(lit("Alice"), lit(85), lit("Math,Science,English")).display()
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
Viz uživatelem definované funkce tabulek v Pythonu (UDTFs).
Katalog Unity řízené vs. relací ohraničené UDFs
Uživatelem definované funkce Pythonu v Unity Catalog a dávkové uživatelem definované funkce Pythonu v Unity Catalog jsou uchovávány v Unity Catalog pro lepší správu, opakované použití a zjistitelnost. Všechny ostatní funkce definované uživatelem jsou založené na relacích, což znamená, že jsou definované v poznámkovém bloku nebo úloze a jsou vymezeny na aktuální SparkSession. Pomocí jazyka Scala nebo Pythonu můžete definovat a používat UDFs s relace ohraničeným rozsahem.
Stručný průvodce k řídícím uživatelem definovaným funkcím v Katalogu Unity
UDF, které se řídí službou Unity Catalog, umožňují definovat, používat, bezpečně sdílet a řídit vlastní funkce napříč výpočetními prostředími. Viz uživatelem definované funkce (UDF) v katalogu Unity.
Typ UDF | Podporované výpočetní prostředky | Popis |
---|---|---|
Uživatelská definovaná funkce Python v katalogu Unity |
|
Definujte uživatelem definované funkce v Pythonu a zaregistrujte ho v katalogu Unity pro správu. Skalární UDF pracují na jednom řádku a vracejí jednu výslednou hodnotu pro každý řádek. |
Dávkový katalog Unity pro Python UDF |
|
Definujte uživatelem definované funkce v Pythonu a zaregistrujte ho v katalogu Unity pro správu. Dávkové operace s více hodnotami a vrací více hodnot. Snižuje náklady spojené s operacemi po řádcích při zpracovávání velkých objemů dat. |
Stručná příručka k UDF s omezeným oborem relace pro uživatelsky izolovaný výpočet
Relativně definované funkce (UDFs) jsou specifikovány v rámci poznámkového bloku nebo úlohy a jsou omezeny na aktuální SparkSession. Pomocí jazyka Scala nebo Pythonu můžete definovat a používat UDFs s relace ohraničeným rozsahem.
Typ UDF | Podporované výpočetní prostředky | Popis |
---|---|---|
Skalární jazyk Python |
|
Skalární UDF pracují na jednom řádku a vracejí jednu výslednou hodnotu pro každý řádek. |
Neskalární Python |
|
Mezi ne skalární funkce definované uživatelem patří pandas_udf , mapInPandas , mapInArrow , applyInPandas . Funkce Pandas UDFs používají Apache Arrow k přenosu dat a využívají knihovnu pandas pro práci s těmito daty. Funkce pandas UDF podporují vektorizované operace, které mohou výrazně zvýšit výkon oproti skalárním UDF zpracovávaným řádek po řádku. |
Definované funkce Pythonu |
|
UDTF přebírá jeden nebo více vstupních argumentů a vrací více řádků (a případně více sloupců) pro každý vstupní řádek. |
Skalární uživatelem definované funkce Scala |
|
Skalární UDF pracují na jednom řádku a vracejí jednu výslednou hodnotu pro každý řádek. |
Scala UDAFs |
|
UDAF pracují s více řádky a vrací jeden agregovaný výsledek. |
Důležité informace o výkonu
Vestavěné funkce a SQL UDF jsou nejúčinnějšími možnostmi.
Scala UDF jsou obecně rychlejší než Python UDF.
- Unisolated Scala UDF běží na virtuálním počítači Java Virtual Machine (JVM), aby se vyhnuli režijním nákladům na přesun dat do a z prostředí JVM.
- Izolované Scala UDF musí přesouvat data do a z JVM, ale můžou být rychlejší než Python UDF, protože efektivněji zpracovávají paměť.
UDF funkce Pythonu a UDF funkce pandas bývají pomalejší než UDF funkce Scala, protože musí serializovat data a přesouvat je z JVM do Pythonového interpretu.
- Pandas UDFs jsou až 100x rychlejší než Python UDFs, protože používají Apache Arrow, aby snížily náklady na serializaci.