Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Nota
Questo articolo illustra Databricks Connect per Databricks Runtime 13.3 e versioni successive.
Databricks Connect per Python supporta funzioni definite dall'utente. Quando viene eseguita un'operazione DataFrame che include UDF, queste vengono serializzate da Databricks Connect e inviate al server come parte della richiesta.
Per informazioni sulle funzioni definite dall'utente in Databricks Connect per Scala, vedere Funzioni definite dall'utente in Databricks Connect per Scala.
Nota
Poiché la funzione definita dall'utente viene serializzata e deserializzata, la versione Python del client deve corrispondere alla versione python nel calcolo di Azure Databricks. Per le versioni supportate, vedere la matrice di supporto della versione.
Definire una funzione definita dall'utente
Per creare una UDF (funzione definita dall'utente) in Databricks Connect per Python, utilizzare una delle funzioni supportate seguenti:
- Funzioni pySpark definite dall'utente
- Funzioni di streaming PySpark
Ad esempio, il seguente script Python configura una semplice funzione definita dall'utente (UDF) che eleva al quadrato i valori nella colonna.
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession
@udf(returnType=IntegerType())
def double(x):
return x * x
spark = DatabricksSession.builder.getOrCreate()
df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))
df.show()
Funzioni definite dall'utente con dipendenze
Importante
Questa funzionalità è disponibile in anteprima pubblica e richiede Databricks Connect per Python 16.4 o versione successiva e un cluster che esegue Databricks Runtime 16.4 o versione successiva. Per usare questa funzionalità, abilita l'anteprima delle funzioni definite dall'utente Python avanzate nel Unity Catalog nell'area di lavoro.
Databricks Connect supporta la specifica delle dipendenze Python necessarie per UDFs (funzioni definite dall'utente). Queste dipendenze vengono installate nel calcolo di Databricks come parte dell'ambiente Python della funzione definita dall'utente.
Questa funzionalità consente agli utenti di specificare le dipendenze necessarie per la funzione definita dall'utente (UDF) oltre ai pacchetti forniti nell'ambiente di base. Può anche essere usato per installare una versione diversa del pacchetto da quello fornito nell'ambiente di base.
Le dipendenze possono essere installate dalle origini seguenti:
- Pacchetti PyPI
- I pacchetti PyPI possono essere specificati in base a PEP 508, ad esempio ,
dicepyjokes<1osimplejson==3.19.*.
- I pacchetti PyPI possono essere specificati in base a PEP 508, ad esempio ,
- File archiviati nei volumi del catalogo Unity
- Sono supportati sia i pacchetti wheel (
.whl) che i file tar gzipped (.tar.gz). All'utente deve essere concessaREAD_FILEl'autorizzazione per il file nel volume re:[UC]. - Quando si installano pacchetti dai volumi del catalogo Unity, per invocare le UDF (funzioni definite dall'utente), gli utenti devono disporre
READ VOLUMEdell'autorizzazione per il volume di origine. La concessione di questa autorizzazione a tutti gli utenti dell'account abilita automaticamente questa autorizzazione per i nuovi utenti. - I file dei volumi del catalogo Unity devono essere specificati come
dbfs:<path>, ad esempiodbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whlodbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.
- Sono supportati sia i pacchetti wheel (
Per includere dipendenze personalizzate nella funzione definita dall'utente, definirle in un ambiente usando withDependencies, quindi utilizzare quell'ambiente per creare una sessione Spark. Le dipendenze vengono installate nell'ambiente di calcolo Databricks e saranno disponibili in tutte le UDF che utilizzano questa sessione Spark.
Il codice seguente dichiara il pacchetto dice PyPI come dipendenza:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
In alternativa, per specificare una dipendenza di una rotellina in un volume:
from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
Comportamento nei notebook e nei lavori di Databricks
Nei notebook e nei processi, le dipendenze della funzione definita dall'utente devono essere installate direttamente nel REPL. Databricks Connect convalida l'ambiente Python REPL verificando che tutte le dipendenze specificate siano già installate e generi un'eccezione se non sono installate.
La convalida dell'ambiente notebook viene eseguita sia per le dipendenze di volume PyPI che per il volume del Catalogo Unity. Le dipendenze del volume devono essere inserite in un pacchetto seguendo le specifiche standard per la creazione di pacchetti Python da PEP-427 o versioni successive per i file wheel e PEP-241 o versione successiva per i file di distribuzione di origine. Per altre informazioni sugli standard di creazione di pacchetti Python, vedere la documentazione di PyPA.
Limitazioni
- I file come la rotellina Python o la distribuzione di origine nel computer di sviluppo locale non possono essere specificati direttamente come dipendenza. Devono prima essere caricati nei volumi del catalogo Unity.
- Il supporto delle dipendenze UDF per
pyspark.sql.streaming.DataStreamWriter.foreachepyspark.sql.streaming.DataStreamWriter.foreachBatchrichiede Databricks Connect per Python 18.0 o versione successiva e un cluster che esegue Databricks Runtime 18.0 o versione successiva. - Le dipendenze UDF non sono supportate per le funzioni di aggregazione pandas definite dall'utente sulle funzioni di finestra.
Esempi
L'esempio seguente definisce le dipendenze di PyPI e di volume in un ambiente, crea una sessione con tale ambiente, quindi definisce e chiama funzioni definite dall'utente che utilizzano queste dipendenze.
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd
pypi_deps = ["pyjokes>=0.8,<1"]
volumes_deps = [
# Example library from: https://pypi.org/project/dice/#files
"dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
# Example library from: https://pypi.org/project/simplejson/#files
"dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]
env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()
# UDFs
@udf(returnType=StringType())
def get_joke():
from pyjokes import get_joke
return get_joke()
@udf(returnType=IntegerType())
def double_and_json_parse(x):
import simplejson
return simplejson.loads(simplejson.dumps(x * 2))
@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
import dice
return a * b + dice.roll(f"1d10")[0]
df = spark.range(1, 10)
df = df.withColumns({
"joke": get_joke(),
"doubled": double_and_json_parse(col("id")),
"mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()
Ambiente di base Python
Le funzioni definite dall'utente vengono eseguite nel calcolo di Databricks e non nel client. L'ambiente Python di base in cui vengono eseguite le funzioni definite dall'utente dipende dal calcolo di Databricks.
Per i cluster, l'ambiente Python di base è l'ambiente Python della versione di Databricks Runtime in esecuzione nel cluster. La versione di Python e l'elenco dei pacchetti in questo ambiente di base sono disponibili nelle sezioni Ambiente di sistema e Librerie Python installate delle note sulla versione di Databricks Runtime.
Per il calcolo serverless, l'ambiente Python di base corrisponde alla versione dell'ambiente serverless in base alla tabella seguente.
| Versione di Databricks Connect | Ambiente serverless UDF |
|---|---|
| Da 17.0 a 17.3, Python 3.12 | Versione 4 |
| Da 16.4.1 a 17, Python 3.12 | Versione 3 |
| Da 15.4.10 a 16, Python 3.12 | Versione 3 |
| Da 15.4.10 a 16, Python 3.11 | Versione 2 |
| Da 15.4.0 a 15.4.9 e da 16.0 a 16.3 | Calcolo serverless più recente. Eseguire la migrazione a 15.4.10 LTS o versioni superiori e 16.4.1 LTS o versioni superiori per un ambiente Python stabile. |