Condividi tramite


Che cosa sono le funzioni definite dall'utente ??

Le funzioni definite dall'utente consentono di riutilizzare e condividere codice che estende le funzionalità predefinite in Azure Databricks. Usare le funzioni definite dall'utente (UDF) per eseguire attività specifiche, ad esempio calcoli complessi, trasformazioni o manipolazioni di dati personalizzate.

Quando usare una funzione UDF e Apache Spark?

Usare funzioni definite dall'utente per la logica difficile da esprimere con le funzioni predefinite di Apache Spark. Le funzioni apache Spark predefinite sono ottimizzate per l'elaborazione distribuita e offrono prestazioni migliori su larga scala. Per ulteriori informazioni, vedere la sezione Funzioni.

Databricks consiglia l'utilizzo delle UDF (funzioni definite dall'utente) per le query ad hoc, la pulizia manuale dei dati, l'analisi esplorativa dei dati e le operazioni su set di dati di dimensioni piccole o medie. I casi d'uso comuni per le funzioni definite dall'utente includono crittografia dei dati, decrittografia, hashing, analisi JSON e convalida.

Usare i metodi Apache Spark per le operazioni su set di dati di grandi dimensioni e tutti i carichi di lavoro eseguiti regolarmente o continuamente, inclusi processi ETL e operazioni di streaming.

Comprendere i tipi di UDF (funzioni definite dall'utente)

Seleziona un tipo UDF dalle schede seguenti per visualizzare una descrizione, un esempio e un collegamento per ulteriori informazioni.

Funzione definita dall'utente scalare

Le funzioni scalari definite dall'utente operano su una singola riga e restituiscono un singolo valore per ogni riga. Possono essere regolati dal catalogo Unity o con ambito sessione.

Nell'esempio seguente viene usata una funzione definita dall'utente scalare per calcolare la lunghezza di ogni nome in una name colonna e aggiungere il valore in una nuova colonna name_length.

+-------+-------+
| name  | score |
+-------+-------+
| alice |  10.0 |
| bob   |  20.0 |
| carol |  30.0 |
| dave  |  40.0 |
| eve   |  50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);

-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name  | score | name_length |
+-------+-------+-------------+
| alice |  10.0 |      5      |
|  bob  |  20.0 |      3      |
| carol |  30.0 |      5      |
| dave  |  40.0 |      4      |
|  eve  |  50.0 |      3      |
+-------+-------+-------------+

Per implementare questa funzionalità in un notebook di Databricks usando PySpark:

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(returnType=IntegerType())
def get_name_length(name):
  return len(name)

df = df.withColumn("name_length", get_name_length(df.name))

# Show the result
display(df)

Vedere Funzioni definite dall'utente (UDF) nel catalogo unity e funzioni scalari definite dall'utente - Python.

Funzioni scalari definite dall'utente in modalità batch

Elaborare i dati in batch mantenendo la parità di riga di input/output 1:1. In questo modo si riduce il sovraccarico delle operazioni di riga per riga per l'elaborazione dei dati su larga scala. Le funzioni definite dall'utente batch mantengono anche lo stato tra batch per eseguire in modo più efficiente, riutilizzare le risorse e gestire calcoli complessi che necessitano di contesto tra blocchi di dati.

Possono essere regolati dal catalogo Unity o con ambito sessione.

La seguente funzione definita dall'utente Python del catalogo Batch Unity calcola l'IMC mentre elabora serie di righe:

+-------------+-------------+
| weight_kg   | height_m    |
+-------------+-------------+
|      90     |     1.8     |
|      77     |     1.6     |
|      50     |     1.5     |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
|  BMI   |
+--------+
|  27.8  |
|  30.1  |
|  22.2  |
+--------+

Vedere Funzioni definite dall'utente (UDF) nel catalogo unity e funzioni definite dall'utente Batch Python nel catalogo unity.

Funzioni definite dall'utente non scalari

Le funzioni definite dall'utente non scalari operano su interi set di dati/colonne con rapporti di input/output flessibili (1:N o molti:molti).

Le funzioni definite dall'utente pandas batch con ambito sessione possono essere dei tipi seguenti:

  • Serie a Serie
  • Da iteratore di serie a iteratore di serie
  • Iteratore di più serie in iteratore di serie
  • Conversione da serie a scalare

Di seguito è riportato un esempio di Series to Series UDF (funzione definita dall'utente) di pandas.

from pyspark.sql.functions import pandas_udf
import pandas as pd

df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])

@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
  return weight / (height ** 2)

df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()

Consulta le funzioni definite dall'utente pandas.

UDAF

Le funzioni di aggregazione definite dall'utente (UDAFs) operano su più righe e restituiscono un singolo risultato aggregato. Le aggregazioni definite dall'utente sono limitate all'ambito della sessione.

L'esempio UDAF seguente aggrega i punteggi in base alla lunghezza del nome.

from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd

# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
  return scores.sum()

# Group by name length and aggregate
result_df = (df.groupBy("name_length")
  .agg(total_score_udf(df["score"]).alias("total_score")))

display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
|      3      |     70.0    |
|      4      |     40.0    |
|      5      |     40.0    |
+-------------+-------------+

Vedere funzioni pandas definite dall'utente per Python e funzioni di aggregazione definite dall'utente - Scala.

Funzioni definite dall'utente (UDTFs)

Una funzione tabella definita dall'utente accetta uno o più input e restituisce più righe (ed eventualmente più colonne) per ogni riga di input. Le UDTF sono limitate al contesto della sessione.

Nell'esempio seguente ogni valore nella colonna score corrisponde a un elenco di categorie. UDTF suddivide l'elenco delimitato da virgole in più righe.

+-------+-------+-----------------+
| name  | score |   categories    |
+-------+-------+-----------------+
| alice |  10.0 |  math,science   |
|  bob  |  20.0 |  history,math   |
| carol |  30.0 | science,history |
| dave  |  40.0 |    math,art     |
|  eve  |  50.0 |  science,art    |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf

@udtf(returnType="name: string, score: int, category: string")
class ScoreCategoriesUDTF:
  def eval(self, name: str, score: int, categories: str):
    category_list = categories.split(',')
    for category in category_list:
      yield (name, score, category)

ScoreCategoriesUDTF(lit("Alice"), lit(85), lit("Math,Science,English")).display()
+-------+-------+----------+
| name  | score | category |
+-------+-------+----------+
| alice |  10.0 |   math   |
| alice |  10.0 | science  |
|  bob  |  20.0 | history  |
|  bob  |  20.0 |   math   |
| carol |  30.0 | science  |
| carol |  30.0 | history  |
| dave  |  40.0 |   math   |
| dave  |  40.0 |   art    |
|  eve  |  50.0 | science  |
|  eve  |  50.0 |   art    |
+-------+-------+----------+

Vedere Funzioni di tabella definite dall'utente python (UDF) .

Funzioni definite dall'utente regolamentate da Unity Catalog e con ambito sessione

Le funzioni definite dall'utente Python del Catalogo Unity e le funzioni batch definite dall'utente Python del Catalogo Unity sono persistenti nel Catalogo Unity per migliorare la governance, il riutilizzo e l'individuabilità. Tutte le altre funzioni definite dall'utente sono basate su sessioni, ovvero sono definite in un notebook o in un processo e sono limitate all'ambito dell'oggetto SparkSession corrente. È possibile definire e accedere alle UDF (funzioni definite dall'utente) con ambito di sessione usando Scala o Python.

Scheda di riferimento sulle funzioni definite dall'utente regolamentate da Unity Catalog

Le funzioni utente controllate di Unity Catalog consentono di definire, usare, gestire e condividere in modo sicuro funzioni personalizzate negli ambienti di elaborazione. Si veda Funzioni definite dall'utente (UDF) nel catalogo Unity.

Tipo UDF Calcolo supportato Descrizione
Catalogo Unity UDF Python
  • Serverless notebook e attività
  • Calcolo classico con modalità di accesso standard (Databricks Runtime 13.3 LTS e versioni successive)
  • Sql Warehouse (serverless, pro e classic)
  • Pipeline dichiarative di Lakeflow (versione classica e serverless)
Definire una funzione definita dall'utente in Python e registrarla nel catalogo Unity per la governance.
Le funzioni scalari definite dall'utente operano su una singola riga e restituiscono un singolo valore per ogni riga.
Funzioni definite dall'utente python per il catalogo di Batch Unity
  • Serverless notebook e attività
  • Calcolo classico con modalità di accesso standard (Databricks Runtime 16.3 e versioni successive)
  • magazzino SQL (senza server, versione pro e classica)
Definire una funzione definita dall'utente in Python e registrarla nel catalogo Unity per la governance.
Operazioni batch su più valori per restituire più valori. Riduce il sovraccarico delle operazioni di riga per riga per l'elaborazione dei dati su larga scala.

Foglio informativo sulle funzioni definite dall'utente con ambito sessione per il calcolo isolato dall'utente

Le funzioni definite dall'utente con ambito sessione sono definite in un notebook o in un processo e hanno come ambito la sessione SparkSession corrente. È possibile definire e accedere alle UDF (funzioni definite dall'utente) con ambito di sessione usando Scala o Python.

Tipo UDF Calcolo supportato Descrizione
Python scalare
  • Serverless notebook e attività
  • Calcolo classico con modalità di accesso standard (Databricks Runtime 13.3 LTS e versioni successive)
  • Pipeline dichiarative di Lakeflow (versione classica e serverless)
Le funzioni scalari definite dall'utente operano su una singola riga e restituiscono un singolo valore per ogni riga.
Python non scalare
  • Serverless notebook e attività
  • Calcolo classico con modalità di accesso standard (Databricks Runtime 14.3 LTS e versioni successive)
  • Pipeline dichiarative di Lakeflow (versione classica e serverless)
Le funzioni definite dall'utente (UDF) non scalari includono pandas_udf, mapInPandas, mapInArrow, applyInPandas. Le funzioni definite dall'utente Pandas usano Apache Arrow per trasferire dati e pandas per lavorare con i dati. Le funzioni definite dall'utente di Pandas supportano operazioni vettorializzate che possono aumentare notevolmente le prestazioni rispetto alle funzioni scalari definite dall'utente eseguite riga per riga.
UDF Python
  • Serverless notebook e attività
  • Calcolo classico con modalità di accesso standard (Databricks Runtime 14.3 LTS e versioni successive)
  • Pipeline dichiarative di Lakeflow (versione classica e serverless)
Una funzione tabella definita dall'utente accetta uno o più input e restituisce più righe (ed eventualmente più colonne) per ogni riga di input.
Funzioni definite dall'utente scalari
  • Calcolo classico con modalità di accesso standard (Databricks Runtime 13.3 LTS e versioni successive)
Le funzioni scalari definite dall'utente operano su una singola riga e restituiscono un singolo valore per ogni riga.
Funzioni di aggregazione definite dall'utente di Scala UDAFs
  • Calcolo classico con modalità di accesso standard (Databricks Runtime 14.2 LTS e versioni successive)
Le funzioni di aggregazione definite dall'utente (UDAFs) operano su più righe e restituiscono un singolo risultato aggregato.

Considerazioni sulle prestazioni

  • Le funzioni predefinite e le funzioni definite dall'utente SQL sono le opzioni più efficienti.

  • Le funzioni definite dall'utente Scala sono in genere più veloci rispetto alle funzioni definite dall'utente Python.

    • Le funzioni definite dall'utente Scala isolate vengono eseguite nella JVM (Java Virtual Machine), in modo da evitare il sovraccarico dello spostamento e dell'uscita dei dati dalla JVM.
    • Le funzioni definite dall'utente Scala isolate devono spostare i dati all'interno e all'esterno della JVM, ma possono comunque essere più veloci rispetto alle funzioni definite dall'utente Python perché gestiscono la memoria in modo più efficiente.
  • Le Python UDFs e le pandas UDFs tendono a essere più lente rispetto alle Scala UDFs perché devono serializzare i dati e spostarli dalla JVM all'interprete Python.

    • Le UDF Pandas sono fino a 100 volte più veloci rispetto alle UDF Python perché usano Apache Arrow per ridurre i costi di serializzazione.