Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Remarque
Cet article traite de Databricks Connect pour Databricks Runtime 13.3 et versions ultérieures.
Databricks Connect pour Python prend en charge les fonctions définies par l’utilisateur (UDF). Lorsqu’une opération DataFrame qui inclut des fonctions définies par l’utilisateur est exécutée, les fonctions définies par l’utilisateur sont sérialisées par Databricks Connect et envoyées au serveur dans le cadre de la requête.
Pour plus d’informations sur les UDF pour Databricks Connect pour Scala, consultez Fonctions définies par l’utilisateur dans Databricks Connect pour Scala.
Remarque
Étant donné que la fonction définie par l’utilisateur est sérialisée et désérialisée, la version Python du client doit correspondre à la version Python sur le calcul Azure Databricks. Pour connaître les versions prises en charge, consultez la matrice de prise en charge des versions.
Définir une UDF
Pour créer une fonction UDF dans Databricks Connect pour Python, utilisez l’une des fonctions prises en charge suivantes :
- Fonctions définies par l’utilisateur PySpark
- Fonctions de streaming PySpark
Par exemple, le script Python suivant configure une fonction UDF simple qui élève au carré les valeurs dans une colonne.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
Gérer les dépendances UDF
Importante
Cette fonctionnalité est disponible en préversion publique et nécessite Databricks Connect pour Python 16.4 ou version ultérieure, et un cluster exécutant Databricks Runtime 16.4 ou version ultérieure. Pour utiliser cette fonctionnalité, activez la préversion des UDF Python dans Unity Catalog dans votre espace de travail.
Databricks Connect prend en charge la spécification des dépendances Python requises pour les fonctions UDF. Ces dépendances sont installées sur la capacité de calcul Databricks dans le cadre de l’environnement Python des UDF.
Cette fonctionnalité permet aux utilisateurs de spécifier des dépendances dont l’UDF a besoin en plus des packages fournis dans l’environnement de base. Il peut également être utilisé pour installer une autre version du package à partir de ce qui est fourni dans l’environnement de base.
Les dépendances peuvent être installées à partir des sources suivantes :
- Packages PyPI
- Les packages PyPI peuvent être spécifiés en fonction du PEP 508, par exemple,
dicepyjokes<1ousimplejson==3.19.*.
- Les packages PyPI peuvent être spécifiés en fonction du PEP 508, par exemple,
- Packages stockés dans des volumes de catalogue Unity
- Les distributions générées (
.whl) et les distributions sources (.tar.gz) sont prises en charge. - Les packages de volumes catalogue Unity peuvent être spécifiés comme
dbfs:<path>, par exemple,dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whloudbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz. - L’utilisateur doit avoir une autorisation
READ_FILEsur le fichier dans le volume re:[UC]. L’octroi de cette autorisation à tous les utilisateurs de compte active automatiquement cette autorisation pour les nouveaux utilisateurs.
- Les distributions générées (
- Packages, dossiers et fichiers Python locaux
- Les distributions générées locales (
.whl), les distributions sources (.tar.gz), les dossiers et les fichiers Python peuvent être spécifiés en tant quelocal:<path>, par exemple :local:/path/to/my_private_dep-3.20.2-py3-none-any.whl, ,local:/path/to/my_private_dep-4.0.0.tar.gz,local:/path/to/my_folderlocal:/path/to/my_file.py. - Les chemins absolus et relatifs sont pris en charge, par exemple :
local:/path/to/my_file.pyoulocal:./path/to/my_file.py.
- Les distributions générées locales (
Pour inclure des dépendances personnalisées dans votre fonction UDF, spécifiez-les dans un environnement à l’aide withDependenciesde cet environnement, puis utilisez cet environnement pour créer une session Spark. Les dépendances sont installées sur votre capacité de calcul Databricks et seront disponibles dans toutes les UDF qui utilisent cette session Spark.
Le code suivant déclare le package dice PyPI en tant que dépendance :
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Ou, pour spécifier une dépendance d’une roue dans un volume :
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Comportement dans les notebooks et tâches Databricks
Dans les notebooks et les travaux, les dépendances UDF doivent être installées directement dans l'environnement REPL. Databricks Connect valide l’environnement Python REPL en vérifiant que toutes les dépendances spécifiées sont déjà installées et lève une exception si elles ne sont pas installées. La validation de l’environnement de notebook s’exécute pour les dépendances de volume du catalogue PyPI et Unity, mais pas pour les dépendances locales.
Limites
- La prise en charge des dépendances UDF de
pyspark.sql.streaming.DataStreamWriter.foreachnécessite Databricks Connect pour Python 18.0 ou version ultérieure, ainsi qu'un cluster exécutant Databricks Runtime 18.0 ou version ultérieure. - La prise en charge des dépendances UDF
pyspark.sql.streaming.DataStreamWriter.foreachBatchnécessite Databricks Connect version 18.0 ou ultérieure pour Python, avec un cluster qui exécute Databricks Runtime 18.0 ou version ultérieure. La fonctionnalité n’est pas prise en charge sur serverless. - La prise en charge des dépendances UDF pour les packages, dossiers et fichiers Python locaux nécessite Databricks Connect pour Python 18.1 ou version ultérieure, et un cluster exécutant Databricks Runtime 18.1 ou version ultérieure.
- Les dépendances UDF ne sont pas prises en charge pour les fonctions UDF d’agrégation Pandas appliquées sur des fonctions de fenêtre.
- Les volumes du catalogue Unity et les packages locaux doivent être emballés conformément aux spécifications standard du packaging Python de PEP-427 ou ultérieures pour les distributions de type roue, et de PEP-241 ou ultérieures pour les distributions de source tar. Pour plus d’informations sur les normes d’empaquetage Python, consultez la documentation PyPA.
Exemples
L’exemple suivant définit des dépendances PyPI et volumes dans un environnement, crée une session avec cet environnement, puis définit et appelle des UDF qui utilisent ces dépendances :
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]
local_deps = [
# Example library from: https://pypi.org/project/simplejson/#files
"local:./test/simplejson-3.20.2-py3-none-any.whl",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Gestion automatique des dépendances UDF
Importante
Cette fonctionnalité est disponible en préversion publique et nécessite Databricks Connect pour Python 18.1 ou version ultérieure, Python 3.12 sur votre ordinateur local et un cluster exécutant Databricks Runtime 18.1 ou version ultérieure. Pour utiliser cette fonctionnalité, activez la préversion des UDF Python dans Unity Catalog dans votre espace de travail.
L’API Databricks Connect withAutoDependencies() permet la découverte et le chargement automatiques des modules locaux et des dépendances PyPI publiques utilisées dans les instructions d’importation de vos UDF. Elle supprime la nécessité de spécifier manuellement des dépendances.
Le code suivant active la gestion automatique des dépendances :
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
La withAutoDependencies() méthode accepte les paramètres suivants :
-
upload_local: quand la valeur est définieTrue, les modules locaux importés dans vos fonctions définies par l’utilisateur sont automatiquement découverts, empaquetés et chargés dans le bac à sable UDF. -
use_index: Lorsqu'elles sont définies surTrue, les dépendances PyPI publiques utilisées dans vos fonctions définies par l'utilisateur sont automatiquement découvertes et installées sur l'infrastructure Azure Databricks. Le processus de découverte utilise les packages installés sur votre ordinateur local pour déterminer les versions, ce qui garantit la cohérence entre votre environnement local et l’environnement d’exécution à distance.
Limites
- Les importations dynamiques (par exemple,
importlib.import_module("foo")) ne sont pas prises en charge. - Les packages d’espace de noms (par exemple,
azure.eventhubetgoogle.cloud.aiplatform) ne sont pas pris en charge. - Les dépendances installées à l’aide de références d’URL directes ne sont pas prises en charge. Cela inclut ceux installés à partir de fichiers wheel locaux.
- Les dépendances installées à partir d’index de package privé ne sont pas prises en charge. Les packages installés de cette façon ne peuvent pas être distingués des packages installés à partir du PyPI public.
- La découverte de dépendances ne fonctionne pas dans un interpréteur de commandes Python. Seuls les scripts Python, l’interpréteur de commandes IPython et les notebooks Jupyter sont pris en charge.
Exemples
L’exemple suivant illustre la gestion automatique des dépendances avec les modules locaux et les packages PyPI. Cet exemple nécessite que vous ayez installé simplejson et dice (à l’aide de pip install simplejson dice).
Tout d’abord, créez des modules d’assistance locaux :
# my_helper.py
def double(x):
return 2 * x
# my_json.py
import simplejson
def loads(x):
return simplejson.loads(x)
def dumps(x):
return simplejson.dumps(x)
Ensuite, dans votre script principal, importez ces modules et utilisez-les dans les UDF.
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType
import my_json
from my_helper import double
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))
@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)
df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()
Logging
Pour générer des dépendances découvertes, définissez la variable d’environnement SPARK_CONNECT_LOG_LEVEL sur info ou debug. Vous pouvez également configurer l’infrastructure de journalisation Python :
import logging
logging.basicConfig(level=logging.INFO)
Les journaux pertinents sont émis par le module databricks.connect.auto_dependencies, par exemple :
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0
Environnement de base Python
Les UDF sont exécutées sur la capacité de calcul Databricks et non sur le client. L'environnement Python de base dans lequel les UDF sont exécutées dépend de la capacité de calcul Databricks.
Pour les clusters, l’environnement Python de base est l’environnement Python de la version databricks Runtime s’exécutant sur le cluster. La version de Python et la liste des packages de cet environnement de base se trouvent sous l’environnement système et les sections bibliothèques Python installées des notes de publication databricks Runtime.
Pour le calcul serverless, l’environnement Python de base correspond à la version de l’environnement serverless en fonction du tableau suivant. Les versions de Databricks Connect non répertoriées dans ce tableau ne prennent pas encore en charge serverless ou ont atteint la fin de la prise en charge. Consultez la matrice de support des versions et les versions de Databricks Connect en fin de support.
| Version de Databricks Connect | Environnement sans serveur UDF |
|---|---|
| 18.0, Python 3.12 | Version 5 |
| 17.2 à 17.3, Python 3.12 | Version 4 |
| 16.4.1 à moins de 17, Python 3.12 | Version 3 |
| 15.4.10 à moins de 16, Python 3.12 | Version 3 |
| 15.4.10 jusqu'à la version inférieure à 16, Python 3.11 | Version 2 |