Condividi tramite


Funzioni definite dall'utente in Databricks Connect per Python

Nota

Questo articolo illustra Databricks Connect per Databricks Runtime 13.3 e versioni successive.

Databricks Connect per Python supporta funzioni definite dall'utente. Quando viene eseguita un'operazione DataFrame che include UDF, queste vengono serializzate da Databricks Connect e inviate al server come parte della richiesta.

Per informazioni sulle funzioni definite dall'utente in Databricks Connect per Scala, vedere Funzioni definite dall'utente in Databricks Connect per Scala.

Nota

Poiché la funzione definita dall'utente viene serializzata e deserializzata, la versione Python del client deve corrispondere alla versione python nel calcolo di Azure Databricks. Per le versioni supportate, vedere la matrice di supporto della versione.

Definire una funzione definita dall'utente

Per creare una UDF (funzione definita dall'utente) in Databricks Connect per Python, utilizzare una delle funzioni supportate seguenti:

Ad esempio, il seguente script Python configura una semplice funzione definita dall'utente (UDF) che eleva al quadrato i valori nella colonna.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

Funzioni definite dall'utente con dipendenze

Importante

Questa funzionalità è disponibile in anteprima pubblica e richiede Databricks Connect per Python 16.4 o versione successiva e un cluster che esegue Databricks Runtime 16.4 o versione successiva. Per usare questa funzionalità, abilita l'anteprima delle funzioni definite dall'utente Python avanzate nel Unity Catalog nell'area di lavoro.

Databricks Connect supporta la specifica delle dipendenze Python necessarie per UDFs (funzioni definite dall'utente). Queste dipendenze vengono installate nel calcolo di Databricks come parte dell'ambiente Python della funzione definita dall'utente.

Questa funzionalità consente agli utenti di specificare le dipendenze necessarie per la funzione definita dall'utente (UDF) oltre ai pacchetti forniti nell'ambiente di base. Può anche essere usato per installare una versione diversa del pacchetto da quello fornito nell'ambiente di base.

Le dipendenze possono essere installate dalle origini seguenti:

  • Pacchetti PyPI
    • I pacchetti PyPI possono essere specificati in base a PEP 508, ad esempio , dicepyjokes<1 o simplejson==3.19.*.
  • File archiviati nei volumi del catalogo Unity
    • Sono supportati sia i pacchetti wheel (.whl) che i file tar gzipped (.tar.gz). All'utente deve essere concessa READ_FILE l'autorizzazione per il file nel volume re:[UC].
    • Quando si installano pacchetti dai volumi del catalogo Unity, per invocare le UDF (funzioni definite dall'utente), gli utenti devono disporre READ VOLUME dell'autorizzazione per il volume di origine. La concessione di questa autorizzazione a tutti gli utenti dell'account abilita automaticamente questa autorizzazione per i nuovi utenti.
    • I file dei volumi del catalogo Unity devono essere specificati come dbfs:<path>, ad esempio dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl o dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.

Per includere dipendenze personalizzate nella funzione definita dall'utente, definirle in un ambiente usando withDependencies, quindi utilizzare quell'ambiente per creare una sessione Spark. Le dipendenze vengono installate nell'ambiente di calcolo Databricks e saranno disponibili in tutte le UDF che utilizzano questa sessione Spark.

Il codice seguente dichiara il pacchetto dice PyPI come dipendenza:

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

In alternativa, per specificare una dipendenza di una rotellina in un volume:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Comportamento nei notebook e nei lavori di Databricks

Nei notebook e nei processi, le dipendenze della funzione definita dall'utente devono essere installate direttamente nel REPL. Databricks Connect convalida l'ambiente Python REPL verificando che tutte le dipendenze specificate siano già installate e generi un'eccezione se non sono installate.

La convalida dell'ambiente notebook viene eseguita sia per le dipendenze di volume PyPI che per il volume del Catalogo Unity. Le dipendenze del volume devono essere inserite in un pacchetto seguendo le specifiche standard per la creazione di pacchetti Python da PEP-427 o versioni successive per i file wheel e PEP-241 o versione successiva per i file di distribuzione di origine. Per altre informazioni sugli standard di creazione di pacchetti Python, vedere la documentazione di PyPA.

Limitazioni

  • I file come la rotellina Python o la distribuzione di origine nel computer di sviluppo locale non possono essere specificati direttamente come dipendenza. Devono prima essere caricati nei volumi del catalogo Unity.
  • Il supporto delle dipendenze UDF per pyspark.sql.streaming.DataStreamWriter.foreach e pyspark.sql.streaming.DataStreamWriter.foreachBatch richiede Databricks Connect per Python 18.0 o versione successiva e un cluster che esegue Databricks Runtime 18.0 o versione successiva.
  • Le dipendenze UDF non sono supportate per le funzioni di aggregazione pandas definite dall'utente sulle funzioni di finestra.

Esempi

L'esempio seguente definisce le dipendenze di PyPI e di volume in un ambiente, crea una sessione con tale ambiente, quindi definisce e chiama funzioni definite dall'utente che utilizzano queste dipendenze.

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
    # Example library from: https://pypi.org/project/simplejson/#files
    "dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Ambiente di base Python

Le funzioni definite dall'utente vengono eseguite nel calcolo di Databricks e non nel client. L'ambiente Python di base in cui vengono eseguite le funzioni definite dall'utente dipende dal calcolo di Databricks.

Per i cluster, l'ambiente Python di base è l'ambiente Python della versione di Databricks Runtime in esecuzione nel cluster. La versione di Python e l'elenco dei pacchetti in questo ambiente di base sono disponibili nelle sezioni Ambiente di sistema e Librerie Python installate delle note sulla versione di Databricks Runtime.

Per il calcolo serverless, l'ambiente Python di base corrisponde alla versione dell'ambiente serverless in base alla tabella seguente.

Versione di Databricks Connect Ambiente serverless UDF
Da 17.0 a 17.3, Python 3.12 Versione 4
Da 16.4.1 a 17, Python 3.12 Versione 3
Da 15.4.10 a 16, Python 3.12 Versione 3
Da 15.4.10 a 16, Python 3.11 Versione 2
Da 15.4.0 a 15.4.9 e da 16.0 a 16.3 Calcolo serverless più recente. Eseguire la migrazione a 15.4.10 LTS o versioni superiori e 16.4.1 LTS o versioni superiori per un ambiente Python stabile.