Användardefinierade skalärfunktioner – Python

Den här artikeln innehåller python-exempel på användardefinierade funktioner (UDF). Den visar hur du registrerar UDF:er, hur du anropar UDF:er och ger varningar om utvärderingsordning för underuttryck i Spark SQL.

I Databricks Runtime 14.0 och senare kan du använda Användardefinierade python-tabellfunktioner (UDF: er) för att registrera funktioner som returnerar hela relationer i stället för skalära värden. Se Användardefinierade tabellfunktioner i Python (UDF).

Anteckning

I Databricks Runtime 12.2 LTS och nedan stöds inte Python UDF:er och Pandas UDF:er på Unity Catalog-beräkning som använder standardåtkomstläge. Scalar Python UDF:er och Pandas UDF:er stöds i Databricks Runtime 13.3 LTS och senare för alla åtkomstlägen.

I Databricks Runtime 13.3 LTS och senare kan du registrera skalära Python-UDF:er till Unity Catalog med sql-syntax. Se användardefinierade funktioner (UDF: er) i Unity Catalog.

Registrera en funktion som en UDF

def squared(s):
  return s * s
spark.udf.register("squaredWithPython", squared)

Du kan också ange returtypen för din UDF. Standardreturtypen är StringType.

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Anropa UDF i Spark SQL

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Använda UDF med DataFrames

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Du kan också deklarera samma UDF med hjälp av anteckningssyntax:

from pyspark.sql.functions import udf

@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Varianter med UDF

PySpark-typen för variant är VariantType och värdena är av typen VariantVal. Information om varianter finns i Frågevariantdata.

from pyspark.sql.types import VariantType

# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
  return VariantVal.parseJson(jsonString)

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
|        {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
  return {"v": VariantVal.parseJson(jsonString)}

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
|        {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
  return [VariantVal.parseJson(jsonString)]

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
|           [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
  return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}

spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
|         toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+

Utvärderingsordning och nullkontroll

Spark SQL (inklusive SQL och DataFrame- och Dataset-API:et) garanterar inte utvärderingsordningen för underuttryck. I synnerhet utvärderas inte indata från en operator eller funktion nödvändigtvis från vänster till höger eller i någon annan fast ordning. Till exempel har logiska AND och OR uttryck inte semantik med 'kortslutning' från vänster till höger.

Därför är det farligt att förlita sig på effekter eller utvärderingsordningen för booleska uttryck samt ordningen på WHERE och HAVING satser, eftersom sådana uttryck och satser kan ordnas om under optimering och planering av frågor. Mer specifikt, om en UDF förlitar sig på kortslutningssemantik i SQL för null-kontroll, finns det ingen garanti för att null-kontrollen sker innan UDF anropas. Ett exempel:

spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee

Den här WHERE satsen garanterar strlen inte att UDF anropas efter filtrering av null-värden.

För att utföra rätt null-kontroll rekommenderar vi att du gör något av följande:

  • Gör UDF-funktionen medveten om null-värden och utför null-kontroll inuti själva UDF-funktionen.
  • Använd IF eller CASE WHEN uttryck för att göra null-kontrollen och anropa UDF i en villkorsstyrd gren
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1")   // ok

Tjänstautentiseringsuppgifter i skalära Python-UDF:er

Skalära Python-UDF:er kan använda autentiseringsuppgifter för Unity Catalog-tjänsten för säker åtkomst till externa molntjänster. Detta är användbart för att integrera åtgärder som molnbaserad tokenisering, kryptering eller hemlig hantering direkt i dina datatransformeringar.

Tjänstautentiseringsuppgifter för skalära Python-UDF:er stöds endast i SQL Warehouse och allmän beräkning.

Anteckning

Tjänstautentiseringsuppgifter i skalära Python-UDF:er kräver Databricks Runtime 17.1 och senare.

Information om hur du skapar en tjänstautentiseringsuppgift finns i Skapa autentiseringsuppgifter för tjänsten.

Anteckning

UDF-specifikt API för tjänstautentiseringsuppgifter:
I UDF:er använder du databricks.service_credentials.getServiceCredentialsProvider() för att komma åt autentiseringsuppgifter för tjänsten.

Detta skiljer sig från funktionen dbutils.credentials.getServiceCredentialsProvider() som används i notebook-filer, som inte är tillgänglig i UDF-körningskontexter.

Om du vill komma åt tjänstens autentiseringsuppgifter använder databricks.service_credentials.getServiceCredentialsProvider() du verktyget i din UDF-logik för att initiera moln-SDK:er med lämpliga autentiseringsuppgifter. All kod måste kapslas in i UDF-brödtexten.

@udf
def use_service_credential():
    from azure.mgmt.web import WebSiteManagementClient

    # Assuming there is a service credential named 'testcred' set up in Unity Catalog
    web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
    # Use web_client to perform operations

Tjänstautentiseringstillstånd

Skaparen av UDF måste ha åtkomstbehörighet för Unity Catalog-tjänstens autentiseringsuppgifter.

UDF:er som körs i No-PE omfång, även kallade dedikerade kluster, kräver HANTERA-behörigheter för tjänstens autentiseringsuppgifter.

Standardautentiseringsuppgifter

När databricks används i Skalär Python-UDF:er använder de automatiskt standardautentiseringsuppgifterna för tjänsten från variabeln beräkningsmiljö. Med det här beteendet kan du på ett säkert sätt referera till externa tjänster utan att uttryckligen hantera autentiseringsalias i UDF-koden. Se Ange en standardtjänstautentiseringsuppgift för en beräkningsresurs

Standardstöd för autentiseringsuppgifter är endast tillgängligt i kluster för standard- och dedikerat åtkomstläge. Den är inte tillgänglig i DBSQL.

Du måste installera azure-identity paketen för att använda DefaultAzureCredential leverantören. Information om hur du installerar paketet finns i Python-bibliotek med notebook-omfattning eller bibliotek med beräkningsomfång.

@udf
def use_service_credential():
    from azure.identity import DefaultAzureCredential
    from azure.mgmt.web import WebSiteManagementClient

    # DefaultAzureCredential is automatically using the default service credential for the compute
    web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)

    # Use web_client to perform operations

Hämta uppgiftskörningskontext

Använd TaskContext PySpark API för att få kontextinformation såsom användarens identitet, klustertaggar, spark-jobb-ID och mer. Se Hämta aktivitetskontext i en UDF.

Begränsningar

Följande begränsningar gäller för PySpark-UDF:er:

  • Begränsningar för filåtkomst: På Databricks Runtime 14.2 och nedan kan PySpark-UDF:er på delade kluster inte komma åt Git-mappar, arbetsytefiler eller Unity-katalogvolymer.

  • Broadcast-variabler: PySpark UDF:er på standardkluster för åtkomstläge och serverlös beräkning stöder inte sändningsvariabler.

  • Tjänstautentiseringsuppgifter: Tjänstautentiseringsuppgifter är endast tillgängliga i Batch Unity Catalog Python UDF:er och scalar Python UDF:er. De stöds inte i Standard Unity Catalog Python UDF:er.

  • Tjänstautentiseringsuppgifter: Tjänstautentiseringsuppgifter är endast tillgängliga i serverlös beräkning när du använder serverlös miljöversion 3 eller senare. Se serverlösa miljöversioner.

  • Minnesgräns på serverlös: PySpark-UDF:er på serverlös beräkning har en minnesgräns på 1 GB per PySpark UDF. Om den här gränsen överskrids resulterar det i ett fel av typen UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT_SERVERLESS.
  • Minnesgräns för standardåtkomstläge: PySpark-UDF:er i standardåtkomstläge har en minnesgräns baserat på det tillgängliga minnet för den valda instanstypen. Om det tillgängliga minnet överskrids resulterar det i ett fel av typen UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT.