Nota
L'accés a aquesta pàgina requereix autorització. Podeu provar d'iniciar la sessió o de canviar els directoris.
L'accés a aquesta pàgina requereix autorització. Podeu provar de canviar els directoris.
Nota:
En este artículo se describe Databricks Connect para Databricks Runtime 13.3 y versiones posteriores.
Databricks Connect para Python admite funciones definidas por el usuario (UDF). Cuando se ejecuta una operación DataFrame que incluye UDF, Databricks Connect serializa las UDF y se envían al servidor como parte de la solicitud.
Para obtener información sobre las UDF para Databricks Connect para Scala, consulte Funciones definidas por el usuario en Databricks Connect para Scala.
Nota:
Dado que la función definida por el usuario se serializa y deserializa, la versión de Python del cliente debe coincidir con la versión de Python en el proceso de Azure Databricks. Para ver las versiones admitidas, consulte la matriz de compatibilidad de versiones.
Definir una UDF
Para crear una UDF en Databricks Connect para Python, use una de las siguientes funciones admitidas:
- Funciones definidas por el usuario de PySpark
- Funciones de streaming de PySpark
Por ejemplo, en el siguiente código Python se configura una UDF simple que cuadra los valores de una columna.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
Administración de dependencias de UDF
Importante
Esta característica está en versión preliminar pública y requiere Databricks Connect para Python 16.4 o superior, y un clúster que ejecuta Databricks Runtime 16.4 o superior. Para usar esta característica, habilite la vista previa de UDFs mejoradas de Python en el catálogo de Unity en su espacio de trabajo.
Databricks Connect admite la especificación de dependencias de Python necesarias para las UDF. Estas dependencias se instalan en el proceso de Databricks como parte del entorno de Python de UDF.
Esta característica permite a los usuarios especificar las dependencias que necesita la UDF además de los paquetes proporcionados en el entorno base. También se puede usar para instalar una versión diferente del paquete a partir de lo que se proporciona en el entorno base.
Las dependencias se pueden instalar desde los siguientes orígenes:
- Paquetes PyPI
- Los paquetes PyPI se pueden especificar según PEP 508, por ejemplo,
dice,pyjokes<1osimplejson==3.19.*.
- Los paquetes PyPI se pueden especificar según PEP 508, por ejemplo,
- Paquetes almacenados en volúmenes de Catálogo de Unity
- Se admiten las distribuciones compiladas (
.whl) y las distribuciones de origen (.tar.gz). - Los paquetes de volúmenes de catálogo de Unity se pueden especificar como
dbfs:<path>, por ejemplo,dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whlodbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz. - Se debe conceder al usuario el permiso
READ_FILEen el archivo del volumen re:[UC]. Conceder este permiso a todos los usuarios de la cuenta permite esto automáticamente para los nuevos usuarios.
- Se admiten las distribuciones compiladas (
- Paquetes locales, carpetas y archivos de Python
- Las distribuciones compiladas locales (
.whl), las distribuciones de origen (.tar.gz), las carpetas y los archivos de Python se pueden especificar comolocal:<path>, por ejemplo:local:/path/to/my_private_dep-3.20.2-py3-none-any.whl,local:/path/to/my_private_dep-4.0.0.tar.gz,local:/path/to/my_folder, .local:/path/to/my_file.py - Se admiten rutas de acceso absolutas y relativas, por ejemplo:
local:/path/to/my_file.pyolocal:./path/to/my_file.py.
- Las distribuciones compiladas locales (
Para incluir dependencias personalizadas en la UDF, especifíquelas en un entorno mediante withDependenciesy, a continuación, use ese entorno para crear una sesión de Spark. Las dependencias se instalan en el entorno de Databricks y estarán disponibles en cualquier UDF que use esta sesión de Spark.
El código siguiente declara el paquete dice pyPI como una dependencia:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
O bien, para indicar la dependencia de una rueda en un volumen:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Modo de uso en cuadernos y trabajos de Databricks
En cuadernos y trabajos, las dependencias de UDF deben instalarse directamente en REPL. Databricks Connect valida el entorno de Python de REPL comprobando que todas las dependencias especificadas ya están instaladas y produce una excepción si no están instaladas. La validación del entorno del cuaderno se ejecuta tanto para las dependencias de volumen de PyPI como para el catálogo de Unity, pero no para las dependencias locales.
Limitaciones
- La compatibilidad de las dependencias de UDF con
pyspark.sql.streaming.DataStreamWriter.foreachrequiere Databricks Connect para Python 18.0 o superior y un clúster que ejecute Databricks Runtime 18.0 o superior. - La compatibilidad con las dependencias de UDF para
pyspark.sql.streaming.DataStreamWriter.foreachBatchrequiere Databricks Connect for Python 18.0 o superior, y un clúster que ejecute Databricks Runtime 18.0 o superior. La funcionalidad no es compatible con arquitecturas sin servidor. - La compatibilidad con las dependencias de UDF para paquetes locales, carpetas y archivos de Python requiere Databricks Connect para Python 18.1 o superior, y un clúster que ejecuta Databricks Runtime 18.1 o superior.
- Las dependencias de funciones definidas por el usuario (UDF) no se admiten para las UDF de agregación de pandas sobre funciones de ventana.
- Los paquetes de volúmenes del catálogo de Unity y los paquetes locales deben empaquetarse siguiendo las especificaciones de empaquetado estándar de Python de PEP-427 o posterior para las distribuciones compiladas por wheel y PEP-241 o posterior para las distribuciones de origen tar. Para más información sobre los estándares de empaquetado de Python, consulte la documentación de PyPA.
Ejemplos
En el siguiente ejemplo se definen las dependencias de PyPI y volúmenes en un entorno, se crea una sesión con ese entorno y se definen y llaman a las UDF que usan esas dependencias.
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]
local_deps = [
# Example library from: https://pypi.org/project/simplejson/#files
"local:./test/simplejson-3.20.2-py3-none-any.whl",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Administración automática de dependencias de UDF
Importante
Esta característica está en versión preliminar pública y requiere Databricks Connect para Python 18.1 o superior, Python 3.12 en la máquina local y un clúster que ejecute Databricks Runtime 18.1 o superior. Para usar esta característica, habilite la vista previa de UDFs mejoradas de Python en el catálogo de Unity en su espacio de trabajo.
La API de Databricks Connect withAutoDependencies() permite la detección y carga automática de módulos locales y dependencias públicas de PyPI usadas en las instrucciones de importación de las UDF. Quita la necesidad de especificar manualmente las dependencias.
El código siguiente habilita la administración automática de dependencias:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
El withAutoDependencies() método acepta los parámetros siguientes:
-
upload_local: Cuando se establece enTrue, los módulos locales importados en tus UDF se detectan, empaquetan y cargan automáticamente en el espacio aislado de UDF. -
use_index: cuando se establece enTrue, las dependencias públicas de PyPI usadas en tus UDFs se detectan e instalan automáticamente en el entorno de cómputo de Azure Databricks. El proceso de detección usa los paquetes instalados en la máquina local para determinar las versiones, lo que garantiza la coherencia entre el entorno local y el entorno de ejecución remota.
Limitaciones
- No se admiten las importaciones dinámicas (por ejemplo,
importlib.import_module("foo")). - No se admiten paquetes de espacio de nombres (por ejemplo,
azure.eventhubygoogle.cloud.aiplatform). - No se admiten las dependencias instaladas mediante referencias de dirección URL directa. Esto incluye los instalados desde archivos de paquetes locales.
- No se admiten las dependencias instaladas desde índices de paquetes privados. Los paquetes instalados de esta manera no se pueden distinguir de los paquetes instalados desde el PyPI público.
- La detección de dependencias no funciona en un shell de Python. Solamente se admiten scripts de Python, shell de IPython y Cuadernos de Jupyter.
Ejemplos
En el ejemplo siguiente se muestra la administración automática de dependencias con módulos locales y paquetes pyPI. En este ejemplo se requiere que haya instalado simplejson y dice (mediante pip install simplejson dice).
En primer lugar, cree módulos auxiliares locales:
# my_helper.py
def double(x):
return 2 * x
# my_json.py
import simplejson
def loads(x):
return simplejson.loads(x)
def dumps(x):
return simplejson.dumps(x)
A continuación, en el script principal, importe estos módulos y úselos en UDF:
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType
import my_json
from my_helper import double
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))
@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)
df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()
Registro
Para generar las dependencias detectadas, establezca la variable de entorno SPARK_CONNECT_LOG_LEVEL en info o debug. Como alternativa, configure el marco de registro de Python:
import logging
logging.basicConfig(level=logging.INFO)
Los registros pertinentes son emitidos por el módulo databricks.connect.auto_dependencies, por ejemplo:
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0
Entorno base de Python
Las UDF se ejecutan en el proceso de Databricks y no en el cliente. El entorno base de Python en el que se ejecutan las UDFs depende del cómputo de Databricks.
En el caso de los clústeres, el entorno de Python base es el entorno de Python de la versión de Databricks Runtime que se ejecuta en el clúster. La versión de Python y la lista de paquetes de este entorno base se encuentran en las secciones Entorno del sistema y Bibliotecas de Python instaladas de las notas de la versión de Databricks Runtime.
Para el proceso sin servidor, el entorno de Python base corresponde a la versión del entorno sin servidor según la tabla siguiente. Las versiones de Databricks Connect que no aparecen en esta tabla aún no son compatibles con el modo sin servidor o han llegado al final del soporte. Consulte la matriz de versiones compatibles y las versiones fin de soporte de Databricks Connect.
| Versión de Databricks Connect | Entorno sin servidor UDF |
|---|---|
| 18.0, Python 3.12 | Versión 5 |
| Versiones 17.2 a 17.3, Python 3.12 | Versión 4 |
| 16.4.1 a menos de 17, Python 3.12 | Versión 3 |
| 15.4.10 a menos de 16, Python 3.12 | Versión 3 |
| 15.4.10 a menos de 16, Python 3.11 | Versión 2 |