Nota
O acesso a esta página requer autorização. Pode tentar iniciar sessão ou alterar os diretórios.
O acesso a esta página requer autorização. Pode tentar alterar os diretórios.
Observação
Este artigo aborda o Databricks Connect para o Databricks Runtime 13.3 e versões superiores.
O Databricks Connect for Python suporta funções definidas pelo usuário (UDF). Quando uma operação DataFrame que inclui UDFs é executada, as UDFs são serializadas pelo Databricks Connect e enviadas ao servidor como parte da solicitação.
Para obter informações sobre UDFs para Databricks Connect for Scala, consulte Funções definidas pelo usuário no Databricks Connect for Scala.
Observação
Como a função definida pelo usuário é serializada e desserializada, a versão Python do cliente deve corresponder à versão Python na computação do Azure Databricks. Para versões suportadas, consulte a matriz de suporte da versão .
Definir uma UDF
Para criar um UDF no Databricks Connect for Python, use uma das seguintes funções suportadas:
- Funções definidas pelo usuário do PySpark
- Funções de streaming PySpark
Por exemplo, o Python a seguir configura um UDF simples que quadra os valores em uma coluna.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
Gerir dependências de UDF
Importante
Esse recurso está no Public Preview e requer o Databricks Connect para Python 16.4 ou superior e um cluster executando o Databricks Runtime 16.4 ou superior. Para usar esse recurso, habilite a visualização UDFs Python aprimoradas no Unity Catalog em seu espaço de trabalho.
O Databricks Connect suporta a especificação de dependências Python que são necessárias para UDFs. Essas dependências são instaladas na plataforma Databricks como parte do ambiente Python da UDF.
Esse recurso permite que os usuários especifiquem dependências que o UDF precisa além dos pacotes fornecidos no ambiente base. Ele também pode ser usado para instalar uma versão diferente do pacote do que é fornecido no ambiente base.
As dependências podem ser instaladas a partir das seguintes fontes:
- Pacotes PyPI
- Os pacotes PyPI podem ser especificados de acordo com o PEP 508, por exemplo,
dicepyjokes<1ousimplejson==3.19.*.
- Os pacotes PyPI podem ser especificados de acordo com o PEP 508, por exemplo,
- Pacotes armazenados em volumes do Catálogo Unity
- São suportadas tanto distribuições construídas (
.whl) como distribuições de origem (.tar.gz). - Os pacotes de volumes do Unity Catalog podem ser especificados como
dbfs:<path>, por exemplo,dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whloudbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz. - O utilizador deve receber permissão
READ_FILEno ficheiro no volume re:[UC]. Conceder essa permissão a todos os usuários da conta habilita isso automaticamente para novos usuários.
- São suportadas tanto distribuições construídas (
- Pacotes locais, pastas e ficheiros Python
- Distribuições locais construídas (
.whl), distribuições de origem (.tar.gz), pastas e ficheiros Python podem ser especificados comolocal:<path>, por exemplo:local:/path/to/my_private_dep-3.20.2-py3-none-any.whl,local:/path/to/my_private_dep-4.0.0.tar.gz,local:/path/to/my_folder,local:/path/to/my_file.py. - Tanto os caminhos absolutos como os relativos são suportados, por exemplo:
local:/path/to/my_file.pyoulocal:./path/to/my_file.py.
- Distribuições locais construídas (
Para incluir dependências personalizadas em seu UDF, especifique-as em um ambiente usando withDependencieso e, em seguida, use esse ambiente para criar uma sessão do Spark. As dependências são instaladas na sua infraestrutura computacional Databricks e estarão disponíveis em todas as UDFs que utilizam esta sessão do Spark.
O código a seguir declara o pacote dice PyPI como uma dependência:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Ou, para especificar uma dependência de uma roda em um volume:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Comportamento em notebooks e tarefas Databricks
Em notebooks e trabalhos, as dependências UDF precisam ser instaladas diretamente no REPL. O Databricks Connect valida o ambiente REPL Python verificando se todas as dependências especificadas já estão instaladas e lança uma exceção se alguma não estiver instalada. A validação do ambiente de notebooks é executada tanto para dependências de volumes do PyPI como do Unity Catalog, mas não para dependências locais.
Limitações
- O suporte a dependências UDF para
pyspark.sql.streaming.DataStreamWriter.foreachrequer Databricks Connect para Python 18.0 ou superior, e um cluster em execução com Databricks Runtime 18.0 ou superior. - O suporte para
pyspark.sql.streaming.DataStreamWriter.foreachBatchde dependências UDF requer o Databricks Connect para Python 18.0 ou superior e um cluster a correr Databricks Runtime 18.0 ou superior. A funcionalidade não é suportada em serverless. - O suporte a dependências UDF para pacotes locais, pastas e ficheiros Python requer o Databricks Connect para Python 18.1 ou superior, e um cluster a correr Databricks Runtime 18.1 ou superior.
- As dependências de UDF não são suportadas para UDFs de agregação de pandas sobre funções de janela.
- Os volumes do Unity Catalog e os pacotes locais devem ser empacotados seguindo as especificações padrão de embalagem Python do PEP-427 ou posteriores para distribuições construídas em roda e PEP-241 ou posteriores para distribuições de origem tar. Para obter mais informações sobre padrões de empacotamento Python, consulte a documentação do PyPA.
Exemplos
O exemplo a seguir define dependências de PyPI e volumes em um ambiente, cria uma sessão com esse ambiente e, em seguida, define e chama UDFs que usam essas dependências:
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]
local_deps = [
# Example library from: https://pypi.org/project/simplejson/#files
"local:./test/simplejson-3.20.2-py3-none-any.whl",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Gestão automática das dependências do UDF
Importante
Esta funcionalidade está em Pré-visualização Pública e requer o Databricks Connect para Python 18.1 ou superior, Python 3.12 na sua máquina local e um cluster a correr Databricks Runtime 18.1 ou superior. Para usar esse recurso, habilite a visualização UDFs Python aprimoradas no Unity Catalog em seu espaço de trabalho.
A API Databricks Connect withAutoDependencies() permite a descoberta automática e o carregamento de módulos locais e dependências públicas de PyPI usadas nas instruções de importação dos seus UDFs. Elimina a necessidade de especificar manualmente as dependências.
O seguinte código permite a gestão automática de dependências:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
O withAutoDependencies() método aceita os seguintes parâmetros:
-
upload_local: Quando definido paraTrue, os módulos locais importados nos seus UDFs são automaticamente descobertos, empacotados e carregados para o sandbox UDF. -
use_index: Quando definido paraTrue, as dependências públicas do PyPI usadas nos seus UDFs são automaticamente descobertas e instaladas na infraestrutura de computação do Azure Databricks. O processo de descoberta utiliza os pacotes instalados na sua máquina local para determinar as versões, garantindo consistência entre o ambiente local e o ambiente de execução remota.
Limitações
- Importações dinâmicas (por exemplo,
importlib.import_module("foo")) não são suportadas. - Pacotes de namespace (por exemplo,
azure.eventhubegoogle.cloud.aiplatform) não são suportados. - Dependências instaladas usando referências diretas de URL não são suportadas. Isto inclui aqueles instalados a partir de ficheiros 'wheel' locais.
- Dependências instaladas a partir de índices privados de pacotes não são suportadas. Pacotes instalados desta forma não podem ser distinguidos dos pacotes instalados pelo PyPI público.
- A descoberta de dependências não funciona numa shell Python. Apenas scripts Python, shell IPython e Jupyter Notebooks são suportados.
Exemplos
O exemplo seguinte demonstra a gestão automática de dependências tanto com módulos locais como com pacotes PyPI. Este exemplo exige que tenha instalado simplejson e dice (usando pip install simplejson dice).
Primeiro, crie módulos auxiliares locais:
# my_helper.py
def double(x):
return 2 * x
# my_json.py
import simplejson
def loads(x):
return simplejson.loads(x)
def dumps(x):
return simplejson.dumps(x)
Depois, no teu script principal, importa estes módulos e usa-os em UDFs:
# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType
import my_json
from my_helper import double
env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
return my_json.loads(my_json.dumps(double(x)))
@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
return x + y + (dc.roll("d6")[0] / 6)
df = spark.range(1, 10)
df = df.withColumns({
"doubled": double_and_json_parse(col("id")),
"summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()
Registo
Para obter dependências descobertas, defina a SPARK_CONNECT_LOG_LEVEL variável de ambiente para info ou debug. Alternativamente, configure o framework de registo em Python:
import logging
logging.basicConfig(level=logging.INFO)
Os logs relevantes são emitidos pelo databricks.connect.auto_dependencies módulo, por exemplo:
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0
Ambiente base Python
UDFs são executados na computação da Databricks e não no cliente. O ambiente Python base no qual UDFs são executados depende da computação do Databricks.
Para clusters, o ambiente Python base é o ambiente Python da versão do Databricks Runtime em execução no cluster. A versão do Python e a lista de pacotes neste ambiente base são encontradas nas seções Ambiente do sistema e Bibliotecas Python instaladas das notas de versão do Databricks Runtime.
Para computação sem servidor, o ambiente Python base corresponde à versão do ambiente sem servidor de acordo com a tabela a seguir. As versões do Databricks Connect não listadas nesta tabela ou ainda não suportam serverless ou já chegaram ao fim do suporte. Veja a matriz de suporte de versões e as versões de Databricks Connect que atingiram o fim do suporte.
| Versão do Databricks Connect | Ambiente UDF sem servidor |
|---|---|
| 18.0, Python 3.12 | Versão 5 |
| 17.2 a 17.3, Python 3.12 | Versão 4 |
| 16.4.1 até abaixo de 17, Python 3.12 | Versão 3 |
| 15.4.10 para abaixo de 16, Python 3.12 | Versão 3 |
| 15.4.10 até abaixo de 16, Python 3.11 | Versão 2 |