Note
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de changer d’annuaire.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de changer d’annuaire.
Cet article contient des exemples de fonctions définies par l’utilisateur (UDF) Python. Il montre comment inscrire et appeler des fonctions définies par l’utilisateur, et il fournit des avertissements concernant l’ordre d’évaluation des sous-expressions dans Spark SQL.
Dans Databricks Runtime 14.0 et versions ultérieures, vous pouvez utiliser des fonctions de table définies par l’utilisateur (UDTF) Python pour inscrire des fonctions qui retournent des relations entières au lieu de valeurs scalaires. Consultez Fonctions de table définies par l’utilisateur (UDTF) Python.
Remarque
Dans Databricks Runtime 12.2 LTS et versions antérieures, les fonctions définies par l’utilisateur Python et Pandas ne sont pas prises en charge sur le calcul Unity Catalog qui utilise le mode d’accès standard. Les fonctions scalaires définies par l’utilisateur Python et Pandas sont prises en charge dans Databricks Runtime 13.3 LTS et versions ultérieures pour tous les modes d’accès.
Dans Databricks Runtime 13.3 LTS et versions ultérieures, vous pouvez enregistrer des fonctions scalaires Python UDF dans Unity Catalog en utilisant la syntaxe SQL. Consultez les Fonctions définies par l’utilisateur (UDF) dans Unity Catalog.
Inscrire une fonction en tant que fonction définie par l’utilisateur
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Vous pouvez éventuellement définir le type de retour de votre fonction définie par l’utilisateur. Le type de retour par défaut est StringType.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Appeler l'UDF dans Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Utiliser UDF avec des DataFrames
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Vous pouvez également déclarer la même fonction définie par l’utilisateur à l’aide de la syntaxe d’annotation :
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Variantes avec UDF
Le type PySpark pour variant est VariantType et les valeurs sont de type VariantVal. Pour plus d’informations sur les variantes, consultez Les données de variante de requête.
from pyspark.sql.types import VariantType
# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
return VariantVal.parseJson(jsonString)
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
| {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
return {"v": VariantVal.parseJson(jsonString)}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
| {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
return [VariantVal.parseJson(jsonString)]
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
| [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
| toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+
Ordre d’évaluation et vérification de nullité
Spark SQL (incluant SQL et les API TrameDonnées et Jeu de données) ne garantit pas l’ordre d’évaluation des sous-expressions. En particulier, les entrées d’un opérateur ou d’une fonction ne sont pas nécessairement évaluées de gauche à droite ou dans tout autre ordre fixe. Par exemple, les expressions logiques AND et OR n’ont pas de sémantique de « court-circuit » de gauche à droite.
Il est donc dangereux de s’appuyer sur les effets secondaires ou l’ordre d’évaluation des expressions booléennes, ainsi que sur l’ordre des clauses WHERE et HAVING, car ces expressions et clauses peuvent être réordonnées lors de l’optimisation et de la planification des requêtes. Plus précisément, si une fonction UDF s’appuie sur la sémantique de court-circuitage dans SQL pour la vérification null, il n’existe aucune garantie que la vérification null se produit avant d’appeler la fonction UDF. Par exemple,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Cette clause WHERE ne garantit pas que la fonction définie par l’utilisateur strlen soit appelée après le filtrage des valeurs nulles.
Pour effectuer une vérification de valeur null correcte, nous vous recommandons d’effectuer l’une des opérations suivantes :
- Rendez la fonction définie par l’utilisateur sensible aux valeurs nulles et effectuez une vérification de valeurs nulles au sein de la fonction définie par l’utilisateur
- Utilisez les expressions
IFouCASE WHENpour effectuer la vérification de valeurs nulles et appelez la fonction définie par l’utilisateur dans une branche conditionnelle
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Informations d’identification du service dans les fonctions définies par l’utilisateur Python Scalar
Les fonctions UDF Python scalaires peuvent utiliser les informations d'identification du service Unity Catalog pour accéder de manière sécurisée aux services cloud externes. Cela est utile pour intégrer des opérations telles que la tokenisation basée sur le cloud, le chiffrement ou la gestion des secrets directement dans vos transformations de données.
Les informations d’identification du service pour les UDF scalaires Python sont uniquement prises en charge dans les entrepôts SQL et le calcul général.
Remarque
Les informations d’identification du service dans les fonctions UDF (User-Defined Functions) Scala Python, nécessitent Databricks Runtime 17.1 et versions ultérieures.
Pour créer des informations d’identification de service, consultez Créer des informations d’identification de service.
Remarque
API spécifique à la fonction UDF pour les informations d’identification du service :
Dans les FDUs, utilisez databricks.service_credentials.getServiceCredentialsProvider() pour accéder aux identifiants de service.
Cela diffère de la dbutils.credentials.getServiceCredentialsProvider() fonction utilisée dans les notebooks, qui n’est pas disponible dans les contextes d’exécution UDF.
Pour accéder aux informations d’identification du service, utilisez l’utilitaire databricks.service_credentials.getServiceCredentialsProvider() dans votre logique UDF pour initialiser les kits SDK cloud avec les informations d’identification appropriées. Tout le code doit être encapsulé dans le corps UDF.
@udf
def use_service_credential():
from azure.mgmt.web import WebSiteManagementClient
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
# Use web_client to perform operations
Autorisations des informations d’identification du service
Le créateur de la fonction UDF doit disposer de l’autorisation ACCESS sur le service Unity Catalog.
Les fonctions définies par l'utilisateur qui s'exécutent dans la portée No-PE, également appelées clusters dédiés, nécessitent des autorisations MANAGE sur les identifiants de service.
Informations d’identification par défaut
Lorsqu'elle est utilisée dans les UDFs Scalaire Python, Databricks utilise automatiquement l'identifiant par défaut du service à partir de la variable d'environnement de calcul. Ce comportement vous permet de référencer en toute sécurité des services externes sans gérer explicitement les alias d'identifiants dans votre code UDF. Voir Spécifier des informations d’identification de service par défaut pour une ressource de calcul
La prise en charge des informations d’identification par défaut est disponible uniquement dans les clusters en mode d’accès Standard et Dédié. Il n’est pas disponible dans DBSQL.
Vous devez installer le azure-identity package pour utiliser le DefaultAzureCredential fournisseur. Pour installer le package, consultez bibliothèques Python étendues au notebook ou bibliothèques délimitées par le calcul.
@udf
def use_service_credential():
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient
# DefaultAzureCredential is automatically using the default service credential for the compute
web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)
# Use web_client to perform operations
Obtenir le contexte d’exécution des tâches
Utilisez l’API TaskContext PySpark pour obtenir des informations contextuelles telles que l’identité de l’utilisateur, les balises de cluster, l’ID de travail Spark et bien plus encore. Consultez Obtenir le contexte de tâche dans une fonction UDF.
Limites
Les limitations suivantes s'appliquent aux UDF PySpark :
Restrictions d’accès aux fichiers : Sur Databricks Runtime 14.2 et versions antérieures, les UDF PySpark sur les clusters partagés ne peuvent pas accéder aux dossiers Git, aux fichiers d’espace de travail ou aux volumes de catalogue Unity.
Variables de diffusion : les fonctions définies par l’utilisateur PySpark sur les clusters en mode d’accès standard et le calcul serverless ne prennent pas en charge les variables de diffusion.
Informations d’identification du service : Les informations d’identification du service sont disponibles uniquement dans les UDF Python du catalogue Batch Unity et les UDF Python scalaires. Ils ne sont pas pris en charge dans les fonctions définies par l’utilisateur Python du catalogue standard Unity.
Informations d’identification du service : les informations d’identification du service sont disponibles uniquement dans le calcul serverless lors de l’utilisation de l’environnement serverless version 3 ou ultérieure. Consultez les Versions de l’environnement serverless.
- Limite de mémoire sur le calcul sans serveur : les UDF PySpark en environnement sans serveur ont une limite de mémoire de 1 Go par fonction. Le dépassement de cette limite entraîne une erreur de type UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT_SERVERLESS.
- Limite de mémoire en mode d’accès standard : les UDF PySpark en mode d’accès standard ont une limite de mémoire en fonction de la mémoire disponible sur le type d’instance choisi. Le dépassement de la mémoire disponible entraîne une erreur de type UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT.