Compartir a través de


¿Qué son las funciones definidas por el usuario (UDF)?

Las funciones definidas por el usuario (UDF) permiten reutilizar y compartir código que amplía la funcionalidad integrada en Azure Databricks. Use UDF para realizar tareas específicas, como cálculos complejos, transformaciones o manipulaciones de datos personalizadas.

¿Cuándo usar una función de UDF frente a Apache Spark?

Use UDF para la lógica que es difícil de expresar con funciones integradas de Apache Spark. Las funciones integradas de Apache Spark están optimizadas para el procesamiento distribuido y ofrecen un mejor rendimiento a escala. Para obtener más información, vea Funciones.

Databricks recomienda UDF para consultas ad hoc, limpieza manual de datos, análisis exploratorio de datos y operaciones en conjuntos de datos pequeños a medianos. Entre los casos de uso comunes para las UDF se incluyen el cifrado de datos, el descifrado, el hash, el análisis de JSON y la validación.

Utiliza métodos de Apache Spark para operaciones en conjuntos de datos muy grandes y para las cargas de trabajo que se ejecutan periódica o continuamente, incluidos los trabajos ETL y las operaciones de streaming.

Descripción de los tipos de UDF

Seleccione un tipo de UDF en las pestañas siguientes para ver una descripción, un ejemplo y un vínculo para obtener más información.

Función definida por el usuario (UDF) escalar

Las UDF escalares funcionan en una sola fila y devuelven un único valor de resultado para cada fila. Pueden estar regidos por el catálogo de Unity o con ámbito de sesión.

En el ejemplo siguiente se usa una UDF escalar para calcular la longitud de cada nombre de una name columna y agregar el valor en una nueva columna name_length.

+-------+-------+
| name  | score |
+-------+-------+
| alice |  10.0 |
| bob   |  20.0 |
| carol |  30.0 |
| dave  |  40.0 |
| eve   |  50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);

-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name  | score | name_length |
+-------+-------+-------------+
| alice |  10.0 |      5      |
|  bob  |  20.0 |      3      |
| carol |  30.0 |      5      |
| dave  |  40.0 |      4      |
|  eve  |  50.0 |      3      |
+-------+-------+-------------+

Para implementarlo en un cuaderno de Databricks mediante PySpark:

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

@udf(returnType=IntegerType())
def get_name_length(name):
  return len(name)

df = df.withColumn("name_length", get_name_length(df.name))

# Show the result
display(df)

Consulte Funciones definidas por el usuario (UDF) en el catálogo de Unity y funciones escalares definidas por el usuario: Python.

UDF escalares por lotes

Procese los datos en lotes mientras mantiene la paridad de filas de entrada y salida 1:1. Esto reduce la sobrecarga de las operaciones de fila por fila para el procesamiento de datos a gran escala. Las UDF por lotes también mantienen el estado entre lotes para ejecutarse de forma más eficaz, reutilizar recursos y controlar cálculos complejos que necesitan contexto entre fragmentos de datos.

Pueden estar regidos por el catálogo de Unity o con ámbito de sesión.

La siguiente UDF de Python del Catálogo de Unity para Lotes calcula el IMC al procesar lotes de filas:

+-------------+-------------+
| weight_kg   | height_m    |
+-------------+-------------+
|      90     |     1.8     |
|      77     |     1.6     |
|      50     |     1.5     |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple

def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
  for weight_series, height_series in batch_iter:
    yield weight_series / (height_series ** 2)
$$;

select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
|  BMI   |
+--------+
|  27.8  |
|  30.1  |
|  22.2  |
+--------+

Consulte Funciones definidas por el usuario (UDF) en el catálogo de Unity y funciones definidas por el usuario (UDF) de Python de Batch en el catálogo de Unity.

UDF no escalares

Las UDF no escalares funcionan en conjuntos de datos o columnas completos con relaciones flexibles de entrada y salida (1:N o varios:many).

Las UDF por lotes de Pandas de ámbito de sesión pueden ser de los siguientes tipos:

  • Serie a serie
  • Iterador de series a iterador de series
  • Iterador de varias series a iterador de serie
  • Serie a escalar

A continuación se muestra un ejemplo de una UDF de Pandas Series to a series.

from pyspark.sql.functions import pandas_udf
import pandas as pd

df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])

@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
  return weight / (height ** 2)

df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()

Consulte las funciones definidas por el usuario de Pandas.

UDAF

Las UDAF funcionan en varias filas y devuelven un único resultado agregado. Las UDAF solo tienen ámbito de sesión.

En el siguiente ejemplo de UDAF se agregan puntuaciones por longitud de nombre.

from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd

# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
  return scores.sum()

# Group by name length and aggregate
result_df = (df.groupBy("name_length")
  .agg(total_score_udf(df["score"]).alias("total_score")))

display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
|      3      |     70.0    |
|      4      |     40.0    |
|      5      |     40.0    |
+-------------+-------------+

Consulte funciones definidas por el usuario de Pandas para Python y funciones de agregado definidas por el usuario: Scala.

UDTF

Un UDTF toma uno o varios argumentos de entrada y devuelve varias filas (y posiblemente varias columnas) para cada fila de entrada. Las UDF solo tienen ámbito de sesión.

En el ejemplo siguiente, cada valor de la columna de puntuación corresponde a una lista de categorías. El UDTF divide la lista separada por comas en varias filas.

+-------+-------+-----------------+
| name  | score |   categories    |
+-------+-------+-----------------+
| alice |  10.0 |  math,science   |
|  bob  |  20.0 |  history,math   |
| carol |  30.0 | science,history |
| dave  |  40.0 |    math,art     |
|  eve  |  50.0 |  science,art    |
+-------+-------+-----------------+
from pyspark.sql.functions import udtf

@udtf(returnType="name: string, score: int, category: string")
class ScoreCategoriesUDTF:
  def eval(self, name: str, score: int, categories: str):
    category_list = categories.split(',')
    for category in category_list:
      yield (name, score, category)

ScoreCategoriesUDTF(lit("Alice"), lit(85), lit("Math,Science,English")).display()
+-------+-------+----------+
| name  | score | category |
+-------+-------+----------+
| alice |  10.0 |   math   |
| alice |  10.0 | science  |
|  bob  |  20.0 | history  |
|  bob  |  20.0 |   math   |
| carol |  30.0 | science  |
| carol |  30.0 | history  |
| dave  |  40.0 |   math   |
| dave  |  40.0 |   art    |
|  eve  |  50.0 | science  |
|  eve  |  50.0 |   art    |
+-------+-------+----------+

Consulta Funciones de tabla definidas por el usuario de Python (UDTF).

UDF controladas por el catálogo de Unity frente a las UDF con ámbito de sesión

Las UDF de Python del catálogo de Unity y las UDF por lotes de Python del catálogo de Unity se guardan en el catálogo de Unity para mejorar la gobernanza, la reutilización y la capacidad de descubrimiento. Todas las demás UDF se basan en la sesión, lo que significa que se definen en un cuaderno o trabajo y tienen como ámbito la sparkSession actual. Puede definir y acceder a UDF con ámbito de sesión mediante Scala o Python.

Hoja de referencia rápida de UDF controladas por el catálogo de Unity

Las UDF reguladas por el catálogo de Unity permiten definir, usar, compartir y gobernar funciones personalizadas en entornos informáticos de forma segura. Consulte Funciones definidas por el usuario (UDF) en Unity Catalog.

Tipo de UDF Proceso admitido Descripción
UDF de Python del catálogo de Unity
  • Cuadernos y trabajos sin servidor
  • Proceso clásico con modo de acceso estándar (Databricks Runtime 13.3 LTS y versiones posteriores)
  • Almacenes de SQL (sin servidor, profesionales y clásicos)
  • Pipelines declarativas de Lakeflow (clásicas y sin servidor)
Defina una UDF en Python y regístrela en el Catálogo de Unity para la gobernanza.
Las UDF escalares funcionan en una sola fila y devuelven un único valor de resultado para cada fila.
UDF de Python por lotes en Unity Catalog
  • Cuadernos y trabajos sin servidor
  • Proceso clásico con modo de acceso estándar (Databricks Runtime 16.3 y versiones posteriores)
  • SQL Warehouse (sin servidor, profesional y clásico)
Defina una UDF en Python y regístrela en el Catálogo de Unity para la gobernanza.
Operaciones por lotes sobre varios valores devuelven varios valores. Reduce la sobrecarga de las operaciones de fila por fila para el procesamiento de datos a gran escala.

Hoja de referencia rápida de UDF con ámbito de sesión para cómputo aislado por el usuario

Las UDF con ámbito de sesión se definen en un cuaderno o trabajo y tienen como ámbito la sparkSession actual. Puede definir y acceder a UDF con ámbito de sesión mediante Scala o Python.

Tipo de UDF Proceso admitido Descripción
Python escalar
  • Cuadernos y trabajos sin servidor
  • Proceso clásico con modo de acceso estándar (Databricks Runtime 13.3 LTS y versiones posteriores)
  • Pipelines declarativas de Lakeflow (clásicas y sin servidor)
Las UDF escalares funcionan en una sola fila y devuelven un único valor de resultado para cada fila.
Python no escalar
  • Cuadernos y trabajos sin servidor
  • Proceso clásico con modo de acceso estándar (Databricks Runtime 14.3 LTS y versiones posteriores)
  • Pipelines declarativas de Lakeflow (clásicas y sin servidor)
Las UDF no escalares incluyen pandas_udf, mapInPandas, mapInArrow, , applyInPandas. Las UDF de Pandas usan Apache Arrow para transferir datos y pandas para trabajar con los datos. Las UDF de Pandas admiten operaciones vectorizadas que pueden aumentar enormemente el rendimiento sobre las UDF escalares de fila a fila.
UDTF de Python
  • Cuadernos y trabajos sin servidor
  • Proceso clásico con modo de acceso estándar (Databricks Runtime 14.3 LTS y versiones posteriores)
  • Pipelines declarativas de Lakeflow (clásicas y sin servidor)
Un UDTF toma uno o varios argumentos de entrada y devuelve varias filas (y posiblemente varias columnas) para cada fila de entrada.
UDF escalares de Scala
  • Proceso clásico con modo de acceso estándar (Databricks Runtime 13.3 LTS y versiones posteriores)
Las UDF escalares funcionan en una sola fila y devuelven un único valor de resultado para cada fila.
UDAF de Scala
  • Proceso clásico con modo de acceso estándar (Databricks Runtime 14.2 LTS y versiones posteriores)
Las UDAF funcionan en varias filas y devuelven un único resultado agregado.

Consideraciones sobre el rendimiento

  • Las funciones integradas y las UDF de SQL son las opciones más eficaces.

  • Las UDF de Scala suelen ser más rápidas que las UDF de Python.

    • Las UDF de Scala unisoladas se ejecutan en la máquina virtual Java (JVM), por lo que evitan la sobrecarga de mover datos dentro y fuera de la JVM.
    • Las UDF de Scala aisladas tienen que mover datos dentro y fuera de la JVM, pero aún pueden ser más rápidos que las UDF de Python porque controlan la memoria de forma más eficaz.
  • Las UDF de Python y las UDFde Pandas tienden a ser más lentas que las UDF de Scala porque necesitan serializar datos y moverlos fuera de la JVM al intérprete de Python.

    • Las UDF de Pandas son hasta 100 veces más rápidas que las UDF de Python porque usan Apache Arrow para reducir los costos de serialización.