Función escalar definida por el usuario - Python
Este artículo contiene ejemplos de funciones definidas por el usuario (UDF) de Python. Muestra cómo registrar UDF, cómo invocar UDF y advertencias con respecto al orden de evaluación de las subexpresiones en Spark SQL.
En Databricks Runtime 14.0 y versiones posteriores, puede usar funciones de tabla definidas por el usuario (UDTF) de Python para registrar funciones que devuelven relaciones completas en lugar de valores escalares. Consulta Funciones de tabla definidas por el usuario de Python (UDTF).
Nota:
En Databricks Runtime 12.2 LTS y versiones posteriores, las UDF de Python y las UDF de Pandas no se admiten en el proceso de Catálogo de Unity que usa el modo de acceso compartido. Las UDF escalares de Python y las UDF de Pandas se admiten en Databricks Runtime 13.3 LTS y versiones posteriores para todos los modos de acceso.
En Databricks Runtime 13.3 LTS y versiones posteriores, puede registrar UDF escalares de Python en el catálogo de Unity mediante la sintaxis SQL. Consulte Funciones definidas por el usuario (UDF) en Unity Catalog.
Registro de una función como UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Opcionalmente, puede establecer el tipo de valor devuelto de la UDF. El tipo de valor devuelto predeterminado es StringType
.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Llamada a la UDF en Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Uso de UDF con DataFrames
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Como alternativa, puede declarar la misma UDF mediante la sintaxis de anotación:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Orden de evaluación y comprobación de valores NULL
Spark SQL (incluidas SQL y las DataFrame API y Dataset API) no garantiza el orden de evaluación de las subexpresiones. En concreto, las entradas de un operador o función no se evalúan necesariamente de izquierda a derecha ni en ningún otro orden fijo. Por ejemplo, las expresiones lógicas AND
y OR
no tienen semántica de "cortocircuito" de izquierda a derecha.
Por lo tanto, es peligroso basarse en los efectos secundarios o el orden de evaluación de las expresiones booleanas y el orden de las cláusulas WHERE
y HAVING
, ya que estas expresiones y cláusulas se pueden reordenar durante la optimización y el planeamiento de consultas. En concreto, si una UDF se basa en la semántica de cortocircuito en SQL para la comprobación de valores NULL, no hay ninguna garantía de que se realizará la comprobación de valores NULL antes de invocar la UDF. Por ejemplo,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Esta cláusula WHERE
no garantiza que se invoque la UDF strlen
después de filtrar los valores NULL.
Para realizar una comprobación correcta de los valores NULL, se recomienda realizar una de las siguientes acciones:
- Hacer que la UDF tenga en cuenta los valores NULL y realizar la comprobación de valores NULL dentro de la propia UDF
- Uso de expresiones
IF
oCASE WHEN
para realizar la comprobación de valores NULL e invocar la UDF en una rama condicional
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Limitaciones
- Las UDF de PySpark en clústeres compartidos o proceso sin servidor no pueden acceder a carpetas de Git, archivos de área de trabajo o volúmenes UC para importar módulos en Databricks Runtime 14.2 y versiones posteriores.