Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Le funzioni definite dall'utente consentono di riutilizzare e condividere codice che estende le funzionalità predefinite in Azure Databricks. Usare le funzioni definite dall'utente (UDF) per eseguire attività specifiche, ad esempio calcoli complessi, trasformazioni o manipolazioni di dati personalizzate.
Quando usare una funzione UDF e Apache Spark?
Usare funzioni definite dall'utente per la logica difficile da esprimere con le funzioni predefinite di Apache Spark. Le funzioni apache Spark predefinite sono ottimizzate per l'elaborazione distribuita e offrono prestazioni migliori su larga scala. Per ulteriori informazioni, vedere la sezione Funzioni.
Databricks consiglia l'utilizzo delle UDF (funzioni definite dall'utente) per le query ad hoc, la pulizia manuale dei dati, l'analisi esplorativa dei dati e le operazioni su set di dati di dimensioni piccole o medie. I casi d'uso comuni per le funzioni definite dall'utente includono crittografia dei dati, decrittografia, hashing, analisi JSON e convalida.
Usare i metodi Apache Spark per le operazioni su set di dati di grandi dimensioni e tutti i carichi di lavoro eseguiti regolarmente o continuamente, inclusi processi ETL e operazioni di streaming.
Comprendere i tipi di UDF (funzioni definite dall'utente)
Seleziona un tipo UDF dalle schede seguenti per visualizzare una descrizione, un esempio e un collegamento per ulteriori informazioni.
Funzione definita dall'utente scalare
Le funzioni scalari definite dall'utente operano su una singola riga e restituiscono un singolo valore per ogni riga. Possono essere regolati dal catalogo Unity o con ambito sessione.
Nell'esempio seguente viene usata una funzione definita dall'utente scalare per calcolare la lunghezza di ogni nome in una name
colonna e aggiungere il valore in una nuova colonna name_length
.
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
Per implementare questa funzionalità in un notebook di Databricks usando PySpark:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
Vedere Funzioni definite dall'utente (UDF) nel catalogo unity e funzioni scalari definite dall'utente - Python.
Funzioni scalari definite dall'utente in modalità batch
Elaborare i dati in batch mantenendo la parità di riga di input/output 1:1. In questo modo si riduce il sovraccarico delle operazioni di riga per riga per l'elaborazione dei dati su larga scala. Le funzioni definite dall'utente batch mantengono anche lo stato tra batch per eseguire in modo più efficiente, riutilizzare le risorse e gestire calcoli complessi che necessitano di contesto tra blocchi di dati.
Possono essere regolati dal catalogo Unity o con ambito sessione.
La seguente funzione definita dall'utente Python del catalogo Batch Unity calcola l'IMC mentre elabora serie di righe:
+-------------+-------------+
| weight_kg | height_m |
+-------------+-------------+
| 90 | 1.8 |
| 77 | 1.6 |
| 50 | 1.5 |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
| BMI |
+--------+
| 27.8 |
| 30.1 |
| 22.2 |
+--------+
Vedere Funzioni definite dall'utente (UDF) nel catalogo unity e funzioni definite dall'utente Batch Python nel catalogo unity.
Funzioni definite dall'utente non scalari
Le funzioni definite dall'utente non scalari operano su interi set di dati/colonne con rapporti di input/output flessibili (1:N o molti:molti).
Le funzioni definite dall'utente pandas batch con ambito sessione possono essere dei tipi seguenti:
- Serie a Serie
- Da iteratore di serie a iteratore di serie
- Iteratore di più serie in iteratore di serie
- Conversione da serie a scalare
Di seguito è riportato un esempio di Series to Series UDF (funzione definita dall'utente) di pandas.
from pyspark.sql.functions import pandas_udf
import pandas as pd
df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])
@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
return weight / (height ** 2)
df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()
Consulta le funzioni definite dall'utente pandas.
UDAF
Le funzioni di aggregazione definite dall'utente (UDAFs) operano su più righe e restituiscono un singolo risultato aggregato. Le aggregazioni definite dall'utente sono limitate all'ambito della sessione.
L'esempio UDAF seguente aggrega i punteggi in base alla lunghezza del nome.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
Vedere funzioni pandas definite dall'utente per Python e funzioni di aggregazione definite dall'utente - Scala.
Funzioni definite dall'utente (UDTFs)
Una funzione tabella definita dall'utente accetta uno o più input e restituisce più righe (ed eventualmente più colonne) per ogni riga di input. Le UDTF sono limitate al contesto della sessione.
Nell'esempio seguente ogni valore nella colonna score corrisponde a un elenco di categorie. UDTF suddivide l'elenco delimitato da virgole in più righe.
+-------+-------+-----------------+
| name | score | categories |
+-------+-------+-----------------+
| alice | 10.0 | math,science |
| bob | 20.0 | history,math |
| carol | 30.0 | science,history |
| dave | 40.0 | math,art |
| eve | 50.0 | science,art |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf
@udtf(returnType="name: string, score: int, category: string")
class ScoreCategoriesUDTF:
def eval(self, name: str, score: int, categories: str):
category_list = categories.split(',')
for category in category_list:
yield (name, score, category)
ScoreCategoriesUDTF(lit("Alice"), lit(85), lit("Math,Science,English")).display()
+-------+-------+----------+
| name | score | category |
+-------+-------+----------+
| alice | 10.0 | math |
| alice | 10.0 | science |
| bob | 20.0 | history |
| bob | 20.0 | math |
| carol | 30.0 | science |
| carol | 30.0 | history |
| dave | 40.0 | math |
| dave | 40.0 | art |
| eve | 50.0 | science |
| eve | 50.0 | art |
+-------+-------+----------+
Vedere Funzioni di tabella definite dall'utente python (UDF) .
Funzioni definite dall'utente regolamentate da Unity Catalog e con ambito sessione
Le funzioni definite dall'utente Python del Catalogo Unity e le funzioni batch definite dall'utente Python del Catalogo Unity sono persistenti nel Catalogo Unity per migliorare la governance, il riutilizzo e l'individuabilità. Tutte le altre funzioni definite dall'utente sono basate su sessioni, ovvero sono definite in un notebook o in un processo e sono limitate all'ambito dell'oggetto SparkSession corrente. È possibile definire e accedere alle UDF (funzioni definite dall'utente) con ambito di sessione usando Scala o Python.
Scheda di riferimento sulle funzioni definite dall'utente regolamentate da Unity Catalog
Le funzioni utente controllate di Unity Catalog consentono di definire, usare, gestire e condividere in modo sicuro funzioni personalizzate negli ambienti di elaborazione. Si veda Funzioni definite dall'utente (UDF) nel catalogo Unity.
Tipo UDF | Calcolo supportato | Descrizione |
---|---|---|
Catalogo Unity UDF Python |
|
Definire una funzione definita dall'utente in Python e registrarla nel catalogo Unity per la governance. Le funzioni scalari definite dall'utente operano su una singola riga e restituiscono un singolo valore per ogni riga. |
Funzioni definite dall'utente python per il catalogo di Batch Unity |
|
Definire una funzione definita dall'utente in Python e registrarla nel catalogo Unity per la governance. Operazioni batch su più valori per restituire più valori. Riduce il sovraccarico delle operazioni di riga per riga per l'elaborazione dei dati su larga scala. |
Foglio informativo sulle funzioni definite dall'utente con ambito sessione per il calcolo isolato dall'utente
Le funzioni definite dall'utente con ambito sessione sono definite in un notebook o in un processo e hanno come ambito la sessione SparkSession corrente. È possibile definire e accedere alle UDF (funzioni definite dall'utente) con ambito di sessione usando Scala o Python.
Tipo UDF | Calcolo supportato | Descrizione |
---|---|---|
Python scalare |
|
Le funzioni scalari definite dall'utente operano su una singola riga e restituiscono un singolo valore per ogni riga. |
Python non scalare |
|
Le funzioni definite dall'utente (UDF) non scalari includono pandas_udf , mapInPandas , mapInArrow , applyInPandas . Le funzioni definite dall'utente Pandas usano Apache Arrow per trasferire dati e pandas per lavorare con i dati. Le funzioni definite dall'utente di Pandas supportano operazioni vettorializzate che possono aumentare notevolmente le prestazioni rispetto alle funzioni scalari definite dall'utente eseguite riga per riga. |
UDF Python |
|
Una funzione tabella definita dall'utente accetta uno o più input e restituisce più righe (ed eventualmente più colonne) per ogni riga di input. |
Funzioni definite dall'utente scalari |
|
Le funzioni scalari definite dall'utente operano su una singola riga e restituiscono un singolo valore per ogni riga. |
Funzioni di aggregazione definite dall'utente di Scala UDAFs |
|
Le funzioni di aggregazione definite dall'utente (UDAFs) operano su più righe e restituiscono un singolo risultato aggregato. |
Considerazioni sulle prestazioni
Le funzioni predefinite e le funzioni definite dall'utente SQL sono le opzioni più efficienti.
Le funzioni definite dall'utente Scala sono in genere più veloci rispetto alle funzioni definite dall'utente Python.
- Le funzioni definite dall'utente Scala isolate vengono eseguite nella JVM (Java Virtual Machine), in modo da evitare il sovraccarico dello spostamento e dell'uscita dei dati dalla JVM.
- Le funzioni definite dall'utente Scala isolate devono spostare i dati all'interno e all'esterno della JVM, ma possono comunque essere più veloci rispetto alle funzioni definite dall'utente Python perché gestiscono la memoria in modo più efficiente.
Le Python UDFs e le pandas UDFs tendono a essere più lente rispetto alle Scala UDFs perché devono serializzare i dati e spostarli dalla JVM all'interprete Python.
- Le UDF Pandas sono fino a 100 volte più veloci rispetto alle UDF Python perché usano Apache Arrow per ridurre i costi di serializzazione.