Share via


Door de gebruiker gedefinieerde functies in Databricks Connect voor Python

Notitie

Dit artikel bevat informatie over Databricks Connect voor Databricks Runtime 13.3 en hoger.

Databricks Connect voor Python ondersteunt door de gebruiker gedefinieerde functies (UDF). Wanneer een DataFrame-bewerking met UDF's wordt uitgevoerd, worden de UDF's geserialiseerd door Databricks Connect en naar de server verzonden als onderdeel van de aanvraag.

Zie Door de gebruiker gedefinieerde functies in Databricks Connect voor Scala voor informatie over UDF's voor Databricks Connect voor Scala.

Notitie

Omdat de door de gebruiker gedefinieerde functie wordt geserialiseerd en gedeserialiseerd, moet de Python-versie van de client overeenkomen met de Python-versie op de Azure Databricks-berekening. Zie de -versieondersteuningsmatrixvoor ondersteunde versies.

Een UDF definiëren

Als u een UDF wilt maken in Databricks Connect voor Python, gebruikt u een van de volgende ondersteunde functies:

Het volgende Python-script stelt bijvoorbeeld een eenvoudige UDF in die de waarden in een kolom kwadrateert.

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from databricks.connect import DatabricksSession

@udf(returnType=IntegerType())
def double(x):
    return x * x

spark = DatabricksSession.builder.getOrCreate()

df = spark.range(1, 2)
df = df.withColumn("doubled", double(col("id")))

df.show()

UDF-afhankelijkheden beheren

Belangrijk

Deze functie bevindt zich in openbare preview en vereist Databricks Connect voor Python 16.4 of hoger en een cluster met Databricks Runtime 16.4 of hoger. Als u deze functie wilt gebruiken, schakelt u de preview verbeterde Python UDF's in de Unity Catalogus in uw werkruimte in.

Databricks Connect biedt ondersteuning voor het opgeven van Python-afhankelijkheden die vereist zijn voor UDF's. Deze afhankelijkheden worden geïnstalleerd op Databricks Compute als onderdeel van de Python-omgeving van de UDF.

Met deze functie kunnen gebruikers afhankelijkheden opgeven die de UDF nodig heeft naast de pakketten in de basisomgeving. Het kan ook worden gebruikt om een andere versie van het pakket te installeren van wat is opgegeven in de basisomgeving.

Afhankelijkheden kunnen worden geïnstalleerd vanuit de volgende bronnen:

  • PyPI-pakketten
    • PyPI-pakketten kunnen worden opgegeven volgens PEP 508, bijvoorbeeld , dicepyjokes<1 of simplejson==3.19.*.
  • Pakketten die zijn opgeslagen in Unity Catalog-volumes
    • Zowel ingebouwde distributies (.whl) als brondistributies (.tar.gz) worden ondersteund.
    • Pakketten met Unity Catalog-volumes kunnen worden opgegeven als dbfs:<path>bijvoorbeelddbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl.dbfs:/Volumes/users/someone@example.com/tars/my_private_dep-4.0.0.tar.gz
    • De gebruiker moet toestemming krijgen READ_FILE voor het bestand in het re:[UC]-volume. Als u deze machtiging verleent aan alle accountgebruikers, wordt dit automatisch ingeschakeld voor nieuwe gebruikers.
  • Lokale pakketten, mappen en Python-bestanden
    • Lokaal gebouwde distributies (.whl), brondistributies (.tar.gz), mappen en Python-bestanden kunnen worden opgegeven alslocal:<path>, bijvoorbeeld: local:/path/to/my_private_dep-3.20.2-py3-none-any.whl, local:/path/to/my_private_dep-4.0.0.tar.gz, local:/path/to/my_folder, . local:/path/to/my_file.py
    • Zowel absolute als relatieve paden worden ondersteund, bijvoorbeeld: local:/path/to/my_file.py of local:./path/to/my_file.py.

Als u aangepaste afhankelijkheden in uw UDF wilt opnemen, geeft u deze op in een omgeving met behulp van withDependenciesen gebruikt u die omgeving om een Spark-sessie te maken. De afhankelijkheden worden geïnstalleerd op uw Databricks-rekenproces en zijn beschikbaar in alle UDF's die gebruikmaken van deze Spark-sessie.

De volgende code declareert het PyPI-pakket dice als een afhankelijkheid:

from databricks.connect import DatabricksSession, DatabricksEnv
env = DatabricksEnv().withDependencies("dice==3.1.0")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Of als u een afhankelijkheid van een wiel in een volume wilt opgeven:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withDependencies("dbfs:/Volumes/users/someone@example.com/wheels/my_private_dep-3.20.2-py3-none-any.whl")
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

Gedrag in Databricks-notebooks en -taken

In notebooks en banen moeten UDF-afhankelijkheden rechtstreeks in de REPL worden geïnstalleerd. Databricks Connect valideert de REPL Python-omgeving door te controleren of alle opgegeven afhankelijkheden al zijn geïnstalleerd en genereert een uitzondering als deze niet zijn geïnstalleerd. Validatie van notebookomgevingen wordt uitgevoerd voor zowel PyPI- als Unity Catalog-volumeafhankelijkheden, maar niet voor lokale afhankelijkheden.

Beperkingen

  • Ondersteuning voor UDF-afhankelijkheden voor pyspark.sql.streaming.DataStreamWriter.foreach Vereist Databricks Connect voor Python 18.0 of hoger en een cluster met Databricks Runtime 18.0 of hoger.
  • Ondersteuning voor UDF-afhankelijkheden voor pyspark.sql.streaming.DataStreamWriter.foreachBatch Vereist Databricks Connect voor Python 18.0 of hoger en een cluster met Databricks Runtime 18.0 of hoger. De functie wordt niet ondersteund op serverloos.
  • Ondersteuning voor UDF-afhankelijkheden voor lokale pakketten, mappen en Python-bestanden vereist Databricks Connect voor Python 18.1 of hoger en een cluster met Databricks Runtime 18.1 of hoger.
  • UDF-afhankelijkheden worden niet ondersteund voor pandas-aggregatie-UDF's via vensterfuncties.
  • Unity Catalog-volumespakketten en lokale pakketten moeten worden verpakt volgens de standaard Python-verpakkingsspecificaties van PEP-427 of hoger voor wiel gebouwde distributies en PEP-241 of hoger voor tar-brondistributies. Raadpleeg de PyPA-documentatie voor meer informatie over Python-verpakkingsstandaarden.

Voorbeelden

In het volgende voorbeeld worden PyPI- en volumesafhankelijkheden in een omgeving gedefinieerd, wordt een sessie met die omgeving gemaakt en vervolgens UDF's gedefinieerd en aangeroepen die gebruikmaken van deze afhankelijkheden:

from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import udf, col, pandas_udf
from pyspark.sql.types import IntegerType, LongType, StringType
import pandas as pd

pypi_deps = ["pyjokes>=0.8,<1"]

volumes_deps = [
    # Example library from: https://pypi.org/project/dice/#files
    "dbfs:/Volumes/main/someone@example.com/test/dice-4.0.0.tar.gz",
]

local_deps = [
    # Example library from: https://pypi.org/project/simplejson/#files
    "local:./test/simplejson-3.20.2-py3-none-any.whl",
]

env = DatabricksEnv().withDependencies(pypi_deps).withDependencies(volumes_deps).withDependencies(local_deps)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

# UDFs
@udf(returnType=StringType())
def get_joke():
    from pyjokes import get_joke
    return get_joke()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    import simplejson
    return simplejson.loads(simplejson.dumps(x * 2))


@pandas_udf(returnType=LongType())
def multiply_and_add_roll(a: pd.Series, b: pd.Series) -> pd.Series:
    import dice
    return a * b + dice.roll(f"1d10")[0]


df = spark.range(1, 10)
df = df.withColumns({
    "joke": get_joke(),
    "doubled": double_and_json_parse(col("id")),
    "mutliplied_with_roll": multiply_and_add_roll(col("id"), col("doubled"))
})
df.show()

Automatisch beheer van UDF-afhankelijkheden

Belangrijk

Deze functie bevindt zich in openbare preview en vereist Databricks Connect voor Python 18.1 of hoger, Python 3.12 op uw lokale computer en een cluster met Databricks Runtime 18.1 of hoger. Als u deze functie wilt gebruiken, schakelt u de preview verbeterde Python UDF's in de Unity Catalogus in uw werkruimte in.

De Databricks Connect-API withAutoDependencies() maakt automatische detectie en upload van lokale modules en openbare PyPI-afhankelijkheden mogelijk die worden gebruikt in de importinstructies in uw UDF's. Hiermee hoeft u geen afhankelijkheden handmatig op te geven.

Met de volgende code wordt automatisch afhankelijkheidsbeheer ingeschakeld:

from databricks.connect import DatabricksSession, DatabricksEnv

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

De withAutoDependencies() methode accepteert de volgende parameters:

  • upload_local: Wanneer deze op True is ingesteld, worden lokale modules die in uw UDF's worden geïmporteerd, automatisch gedetecteerd, verpakt en geüpload naar de UDF-sandbox.
  • use_index: Wanneer deze instelling wordt ingesteld op True, worden openbare PyPI-afhankelijkheden die u in uw UDF's gebruikt automatisch gedetecteerd en geïnstalleerd op Azure Databricks-rekenkracht. Het detectieproces maakt gebruik van de geïnstalleerde pakketten op uw lokale computer om versies te bepalen, waardoor consistentie tussen uw lokale omgeving en de externe uitvoeringsomgeving wordt gegarandeerd.

Beperkingen

  • Dynamische importbewerkingen (bijvoorbeeld importlib.import_module("foo")) worden niet ondersteund.
  • Naamruimtepakketten (bijvoorbeeld azure.eventhub en google.cloud.aiplatform) worden niet ondersteund.
  • Afhankelijkheden die zijn geïnstalleerd met direct-URL-verwijzingen, worden niet ondersteund. Dit geldt ook voor de bestanden die zijn geïnstalleerd vanuit lokale wielbestanden.
  • Afhankelijkheden die zijn geïnstalleerd vanuit indexen van privépakketten, worden niet ondersteund. Pakketten die op deze manier zijn geïnstalleerd, kunnen niet worden onderscheiden van pakketten die zijn geïnstalleerd vanuit de openbare PyPI.
  • Detectie van afhankelijkheden werkt niet in een Python-shell. Alleen Python-scripts, IPython-shell en Jupyter Notebooks worden ondersteund.

Voorbeelden

In het volgende voorbeeld ziet u automatisch afhankelijkheidsbeheer met zowel lokale modules als PyPI-pakketten. Dit voorbeeld vereist dat u hebt geïnstalleerd simplejson en dice (met behulp van pip install simplejson dice).

Maak eerst lokale helpermodules:

# my_helper.py
def double(x):
    return 2 * x
# my_json.py
import simplejson

def loads(x):
    return simplejson.loads(x)

def dumps(x):
    return simplejson.dumps(x)

Importeer deze modules vervolgens in uw hoofdscript en gebruik ze in UDF's:

# main.py
import dice as dc
from databricks.connect import DatabricksSession, DatabricksEnv
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType, FloatType

import my_json
from my_helper import double

env = DatabricksEnv().withAutoDependencies(upload_local=True, use_index=True)
spark = DatabricksSession.builder.withEnvironment(env).getOrCreate()

@udf(returnType=IntegerType())
def double_and_json_parse(x):
    return my_json.loads(my_json.dumps(double(x)))

@udf(returnType=FloatType())
def sum_and_add_noise(x, y):
    return x + y + (dc.roll("d6")[0] / 6)

df = spark.range(1, 10)
df = df.withColumns({
    "doubled": double_and_json_parse(col("id")),
    "summed_with_noise": sum_and_add_noise(col("id"), col("doubled")),
})
df.show()

Loggen

Als u gedetecteerde afhankelijkheden wilt uitvoeren, stelt u de SPARK_CONNECT_LOG_LEVEL omgevingsvariabele in op info of debug. U kunt ook het Python-logboekregistratieframework configureren:

import logging
logging.basicConfig(level=logging.INFO)

De relevante logboeken worden verzonden door de databricks.connect.auto_dependencies module, bijvoorbeeld:

DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_json
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered local module: my_helper
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: simplejson for module simplejson
DEBUG:databricks.connect.auto_dependencies.discovery:Discovered distribution: dice for module dice
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_json
INFO:databricks.connect.auto_dependencies.hook:Synced zip artifact for: my_helper
INFO:databricks.connect.auto_dependencies.hook:Updated simplejson with auto-detected version ==3.20.2
INFO:databricks.connect.auto_dependencies.hook:Updated dice with auto-detected version ==4.0.0

Python-basisomgeving

UDF's worden uitgevoerd op de Databricks-berekening en niet op de client. De Python-basisomgeving waarin UDF's worden uitgevoerd, is afhankelijk van de Databricks-berekening.

Voor clusters is de python-basisomgeving de Python-omgeving van de Databricks Runtime-versie die wordt uitgevoerd op het cluster. De Python-versie en de lijst met pakketten in deze basisomgeving vindt u in de secties Systeemomgeving en geïnstalleerde Python-bibliotheken van de releaseopmerkingen van Databricks Runtime.

Voor serverloze berekening komt de basisomgeving van Python overeen met de serverloze omgevingsversie volgens de volgende tabel. Databricks Connect-versies die niet in deze tabel worden vermeld, bieden nog geen ondersteuning voor serverloos of hebben het einde van de ondersteuning bereikt. Zie versieondersteuningsmatrix en Databricks Connect-versies die niet langer ondersteund worden.

Databricks Connect-versie Serverloze UDF-omgeving
18.0, Python 3.12 Versie 5
17.2 tot 17.3, Python 3.12 Versie 4
16.4.1 tot onder de 17, Python 3.12 Versie 3
15.4.10 tot onder 16, Python 3.12 Versie 3
15.4.10 tot onder 16, Python 3.11 Versie 2