Delen via


Door de gebruiker gedefinieerde functies in Databricks Connect voor Python

Notitie

Dit artikel bevat informatie over Databricks Connect voor Databricks Runtime 13.3 en hoger.

Databricks Connect voor Python ondersteunt door de gebruiker gedefinieerde functies (UDF). Wanneer een DataFrame-bewerking met UDF's wordt uitgevoerd, worden de UDF's geserialiseerd door Databricks Connect en naar de server verzonden als onderdeel van de aanvraag.

Zie Door de gebruiker gedefinieerde functies in Databricks Connect voor Scala voor informatie over UDF's voor Databricks Connect voor Scala.

Notitie

Omdat de door de gebruiker gedefinieerde functie wordt geserialiseerd en gedeserialiseerd, moet de Python-versie van de client overeenkomen met de Python-versie op de Azure Databricks-berekening. Zie de -versieondersteuningsmatrixvoor ondersteunde versies.

Een UDF definiëren

Als u een UDF wilt maken in Databricks Connect voor Python, gebruikt u een van de volgende ondersteunde functies:

Het volgende Python-script stelt bijvoorbeeld een eenvoudige UDF in die de waarden in een kolom kwadrateert.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

UDF's met afhankelijkheden

Belangrijk

Deze functie bevindt zich in openbare preview en vereist Databricks Connect voor Python 16.4 of hoger en een cluster met Databricks Runtime 16.4 of hoger. Als u deze functie wilt gebruiken, schakelt u de preview verbeterde Python UDF's in de Unity Catalogus in uw werkruimte in.

Databricks Connect biedt ondersteuning voor het opgeven van Python-afhankelijkheden die vereist zijn voor UDF's. Deze afhankelijkheden worden geïnstalleerd op Databricks Compute als onderdeel van de Python-omgeving van de UDF.

Met deze functie kunnen gebruikers afhankelijkheden opgeven die de UDF nodig heeft naast de pakketten in de basisomgeving. Het kan ook worden gebruikt om een andere versie van het pakket te installeren van wat is opgegeven in de basisomgeving.

Afhankelijkheden kunnen worden geïnstalleerd vanuit de volgende bronnen:

  • PyPI-pakketten
    • PyPI-pakketten kunnen worden opgegeven volgens PEP 508, bijvoorbeeld , dicepyjokes<1 of simplejson==3.19.*.
  • Bestanden die zijn opgeslagen in Unity Catalog-volumes
    • Zowel wheel packages (.whl) als gecomprimeerde tar-bestanden (.tar.gz) worden ondersteund. De gebruiker moet toestemming krijgen READ_FILE voor het bestand in het re:[UC]-volume.
    • Bij het installeren van pakketten van Unity Catalog-volumes om de UDF's aan te roepen, hebben gebruikers toestemming nodig READ VOLUME voor het bronvolume. Als u deze machtiging verleent aan alle accountgebruikers, wordt dit automatisch ingeschakeld voor nieuwe gebruikers.
    • Unity Catalog-volumesbestanden moeten worden opgegeven als dbfs:<path>bijvoorbeeld dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl of dbfs:/Volumes/users/someone@example.com/tars/my_private_deps.tar.gz.

Als u aangepaste afhankelijkheden in uw UDF wilt opnemen, geeft u deze op in een omgeving met behulp van withDependenciesen gebruikt u die omgeving om een Spark-sessie te maken. De afhankelijkheden worden geïnstalleerd op uw Databricks-rekenproces en zijn beschikbaar in alle UDF's die gebruikmaken van deze Spark-sessie.

De volgende code declareert het PyPI-pakket dice als een afhankelijkheid:

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Of als u een afhankelijkheid van een wiel in een volume wilt opgeven:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Gedrag in Databricks-notebooks en -taken

In notebooks en banen moeten UDF-afhankelijkheden rechtstreeks in de REPL worden geïnstalleerd. Databricks Connect valideert de REPL Python-omgeving door te controleren of alle opgegeven afhankelijkheden al zijn geïnstalleerd en genereert een uitzondering als deze niet zijn geïnstalleerd.

Validatie van notebookomgeving vindt plaats voor zowel PyPI- als Unity Catalog-volumeafhankelijkheden. Volumeafhankelijkheden moeten worden verpakt volgens de standaard python-verpakkingsspecificaties van PEP-427 of hoger voor wielbestanden en PEP-241 of hoger voor brondistributiebestanden. Raadpleeg de PyPA-documentatie voor meer informatie over Python-verpakkingsstandaarden.

Beperkingen

  • Bestanden zoals Python-wiel of brondistributie op uw lokale ontwikkelcomputer kunnen niet rechtstreeks als een afhankelijkheid worden opgegeven. Ze moeten eerst worden geüpload naar Unity Catalog-volumes.
  • UDF-afhankelijkheden ondersteunen pyspark.sql.streaming.DataStreamWriter.foreach en pyspark.sql.streaming.DataStreamWriter.foreachBatch vereisen Databricks Connect voor Python 18.0 of hoger en een cluster met Databricks Runtime 18.0 of hoger.
  • UDF-afhankelijkheden worden niet ondersteund voor pandas-aggregatie-UDF's via vensterfuncties.

Voorbeelden

In het volgende voorbeeld worden PyPI- en volumesafhankelijkheden in een omgeving gedefinieerd, wordt een sessie met die omgeving gemaakt en vervolgens UDF's gedefinieerd en aangeroepen die gebruikmaken van deze afhankelijkheden:

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0-py3-none-any.whl"
    # Example library from: https://pypi.org/project/simplejson/#files
    "dbfs:/Volumes/main/someone@example.com/test/simplejson-3.19.3.tar.gz",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Python-basisomgeving

UDF's worden uitgevoerd op de Databricks-berekening en niet op de client. De Python-basisomgeving waarin UDF's worden uitgevoerd, is afhankelijk van de Databricks-berekening.

Voor clusters is de python-basisomgeving de Python-omgeving van de Databricks Runtime-versie die wordt uitgevoerd op het cluster. De Python-versie en de lijst met pakketten in deze basisomgeving vindt u in de secties Systeemomgeving en geïnstalleerde Python-bibliotheken van de releaseopmerkingen van Databricks Runtime.

Voor serverloze berekening komt de basisomgeving van Python overeen met de serverloze omgevingsversie volgens de volgende tabel.

Databricks Connect-versie Serverloze UDF-omgeving
17.0 tot 17.3, Python 3.12 Versie 4
16.4.1 tot onder de 17, Python 3.12 Versie 3
15.4.10 tot onder 16, Python 3.12 Versie 3
15.4.10 tot onder 16, Python 3.11 Versie 2
15.4.0 tot 15.4.9 en 16.0 tot 16.3 Meest recente serverloze berekeningen. Migreer naar 15.4.10 LTS en hoger of 16.4.1 LTS en hoger voor een stabiele Python-omgeving.