Fonctions scalaires définies par l’utilisateur - Python
Cet article contient des exemples de fonctions définies par l’utilisateur (UDF) Python. Il montre comment inscrire des fonctions définies par l’utilisateur, appeler des fonctions définies par l’utilisateur et il fourni des avertissements concernant l’ordre d’évaluation des sous-expressions dans Spark SQL.
Dans Databricks Runtime 14.0 et versions ultérieures, vous pouvez utiliser des fonctions de table définies par l’utilisateur (UDTF) Python pour inscrire des fonctions qui retournent des relations entières au lieu de valeurs scalaires. Consultez Fonctions de table définies par l’utilisateur (UDTF) Python.
Remarque
Dans Databricks Runtime 12.2 LTS et ci-dessous, les fonctions définies par l’utilisateur Python et les fonctions définies par l’utilisateur Pandas ne sont pas prises en charge sur le calcul du catalogue Unity qui utilise le mode d’accès partagé. Les fonctions définies par l’utilisateur Python scalaire et pandas sont prises en charge dans Databricks Runtime 13.3 LTS et versions ultérieures pour tous les modes d’accès.
Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez inscrire des fonctions scalaires Python définies par l’utilisateur dans Unity Catalog en utilisant la syntaxe SQL. Consultez les Fonctions définies par l’utilisateur (UDF) dans Unity Catalog.
Inscrire une fonction en tant que fonction définie par l’utilisateur
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Vous pouvez éventuellement définir le type de retour de votre fonction définie par l’utilisateur. Le type de retour par défaut est StringType
.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Appeler la fonction définie par l’utilisateur dans Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Utiliser une fonction définie par l’utilisateur avec des tramedonnées
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Vous pouvez également déclarer la même fonction définie par l’utilisateur à l’aide de la syntaxe d’annotation :
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Ordre d’évaluation et vérification de valeur null
Spark SQL (incluant SQL et les API TrameDonnées et Jeu de données) ne garantit pas l’ordre d’évaluation des sous-expressions. En particulier, les entrées d’un opérateur ou d’une fonction ne sont pas nécessairement évaluées de gauche à droite ou dans tout autre ordre fixe. Par exemple, les expressions logiques AND
et OR
n’ont pas de sémantique de « court-circuit » de gauche à droite.
Il est donc dangereux de s’appuyer sur les effets secondaires ou l’ordre d’évaluation des expressions booléennes, ainsi que sur l’ordre des clauses WHERE
et HAVING
, car ces expressions et clauses peuvent être réordonnées lors de l’optimisation et de la planification des requêtes. Plus précisément, si une fonction définie par l’utilisateur repose sur une sémantique de court-circuit dans SQL pour la vérification de valeur null, il n’est pas garanti que celle-ci intervienne avant l’appel de la fonction définie par l’utilisateur. Par exemple,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Cette clause WHERE
ne garantit pas que la fonction définie par l’utilisateur strlen
est appelée après le filtrage des valeurs null.
Pour effectuer une vérification de valeur null correcte, nous vous recommandons d’effectuer l’une des opérations suivantes :
- Rendez la fonction définie par l’utilisateur sensible aux valeurs null, et effectuez une vérification de valeur null dans la fonction définie par l’utilisateur
- Utilisez des expressions
IF
ouCASE WHEN
pour effectuer la vérification de valeur null et appeler la fonction définie par l’utilisateur dans une branche conditionnelle
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Limites
- Les fonctions UDF PySpark sur des clusters partagés ou le calcul serverless ne peuvent pas accéder aux dossiers Git, aux fichiers d’espace de travail ou aux volumes UC pour importer des modules sur Databricks Runtime 14.2 et versions inférieures.