Not
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Den här artikeln innehåller python-exempel på användardefinierade funktioner (UDF). Den visar hur du registrerar UDF:er, hur du anropar UDF:er och ger varningar om utvärderingsordning för underuttryck i Spark SQL.
I Databricks Runtime 14.0 och senare kan du använda Användardefinierade python-tabellfunktioner (UDF: er) för att registrera funktioner som returnerar hela relationer i stället för skalära värden. Se Användardefinierade tabellfunktioner i Python (UDF).
Anteckning
I Databricks Runtime 12.2 LTS och nedan stöds inte Python UDF:er och Pandas UDF:er på Unity Catalog-beräkning som använder standardåtkomstläge. Scalar Python UDF:er och Pandas UDF:er stöds i Databricks Runtime 13.3 LTS och senare för alla åtkomstlägen.
I Databricks Runtime 13.3 LTS och senare kan du registrera skalära Python-UDF:er till Unity Catalog med sql-syntax. Se användardefinierade funktioner (UDF: er) i Unity Catalog.
Registrera en funktion som en UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Du kan också ange returtypen för din UDF. Standardreturtypen är StringType.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Anropa UDF i Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Använda UDF med DataFrames
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Du kan också deklarera samma UDF med hjälp av anteckningssyntax:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Varianter med UDF
PySpark-typen för variant är VariantType och värdena är av typen VariantVal. Information om varianter finns i Frågevariantdata.
from pyspark.sql.types import VariantType
# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
return VariantVal.parseJson(jsonString)
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
| {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
return {"v": VariantVal.parseJson(jsonString)}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
| {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
return [VariantVal.parseJson(jsonString)]
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
| [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
| toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+
Utvärderingsordning och nullkontroll
Spark SQL (inklusive SQL och DataFrame- och Dataset-API:et) garanterar inte utvärderingsordningen för underuttryck. I synnerhet utvärderas inte indata från en operator eller funktion nödvändigtvis från vänster till höger eller i någon annan fast ordning. Till exempel har logiska AND och OR uttryck inte semantik med 'kortslutning' från vänster till höger.
Därför är det farligt att förlita sig på effekter eller utvärderingsordningen för booleska uttryck samt ordningen på WHERE och HAVING satser, eftersom sådana uttryck och satser kan ordnas om under optimering och planering av frågor. Mer specifikt, om en UDF förlitar sig på kortslutningssemantik i SQL för null-kontroll, finns det ingen garanti för att null-kontrollen sker innan UDF anropas. Ett exempel:
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Den här WHERE satsen garanterar strlen inte att UDF anropas efter filtrering av null-värden.
För att utföra rätt null-kontroll rekommenderar vi att du gör något av följande:
- Gör UDF-funktionen medveten om null-värden och utför null-kontroll inuti själva UDF-funktionen.
- Använd
IFellerCASE WHENuttryck för att göra null-kontrollen och anropa UDF i en villkorsstyrd gren
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Tjänstautentiseringsuppgifter i skalära Python-UDF:er
Skalära Python-UDF:er kan använda autentiseringsuppgifter för Unity Catalog-tjänsten för säker åtkomst till externa molntjänster. Detta är användbart för att integrera åtgärder som molnbaserad tokenisering, kryptering eller hemlig hantering direkt i dina datatransformeringar.
Tjänstautentiseringsuppgifter för skalära Python-UDF:er stöds endast i SQL Warehouse och allmän beräkning.
Anteckning
Tjänstautentiseringsuppgifter i skalära Python-UDF:er kräver Databricks Runtime 17.1 och senare.
Information om hur du skapar en tjänstautentiseringsuppgift finns i Skapa autentiseringsuppgifter för tjänsten.
Anteckning
UDF-specifikt API för tjänstautentiseringsuppgifter:
I UDF:er använder du databricks.service_credentials.getServiceCredentialsProvider() för att komma åt autentiseringsuppgifter för tjänsten.
Detta skiljer sig från funktionen dbutils.credentials.getServiceCredentialsProvider() som används i notebook-filer, som inte är tillgänglig i UDF-körningskontexter.
Om du vill komma åt tjänstens autentiseringsuppgifter använder databricks.service_credentials.getServiceCredentialsProvider() du verktyget i din UDF-logik för att initiera moln-SDK:er med lämpliga autentiseringsuppgifter. All kod måste kapslas in i UDF-brödtexten.
@udf
def use_service_credential():
from azure.mgmt.web import WebSiteManagementClient
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
# Use web_client to perform operations
Tjänstautentiseringstillstånd
Skaparen av UDF måste ha åtkomstbehörighet för Unity Catalog-tjänstens autentiseringsuppgifter.
UDF:er som körs i No-PE omfång, även kallade dedikerade kluster, kräver HANTERA-behörigheter för tjänstens autentiseringsuppgifter.
Standardautentiseringsuppgifter
När databricks används i Skalär Python-UDF:er använder de automatiskt standardautentiseringsuppgifterna för tjänsten från variabeln beräkningsmiljö. Med det här beteendet kan du på ett säkert sätt referera till externa tjänster utan att uttryckligen hantera autentiseringsalias i UDF-koden. Se Ange en standardtjänstautentiseringsuppgift för en beräkningsresurs
Standardstöd för autentiseringsuppgifter är endast tillgängligt i kluster för standard- och dedikerat åtkomstläge. Den är inte tillgänglig i DBSQL.
Du måste installera azure-identity paketen för att använda DefaultAzureCredential leverantören. Information om hur du installerar paketet finns i Python-bibliotek med notebook-omfattning eller bibliotek med beräkningsomfång.
@udf
def use_service_credential():
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient
# DefaultAzureCredential is automatically using the default service credential for the compute
web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)
# Use web_client to perform operations
Hämta uppgiftskörningskontext
Använd TaskContext PySpark API för att få kontextinformation såsom användarens identitet, klustertaggar, spark-jobb-ID och mer. Se Hämta aktivitetskontext i en UDF.
Begränsningar
Följande begränsningar gäller för PySpark-UDF:er:
Begränsningar för filåtkomst: På Databricks Runtime 14.2 och nedan kan PySpark-UDF:er på delade kluster inte komma åt Git-mappar, arbetsytefiler eller Unity-katalogvolymer.
Broadcast-variabler: PySpark UDF:er på standardkluster för åtkomstläge och serverlös beräkning stöder inte sändningsvariabler.
Tjänstautentiseringsuppgifter: Tjänstautentiseringsuppgifter är endast tillgängliga i Batch Unity Catalog Python UDF:er och scalar Python UDF:er. De stöds inte i Standard Unity Catalog Python UDF:er.
Tjänstautentiseringsuppgifter: Tjänstautentiseringsuppgifter är endast tillgängliga i serverlös beräkning när du använder serverlös miljöversion 3 eller senare. Se serverlösa miljöversioner.
- Minnesgräns på serverlös: PySpark-UDF:er på serverlös beräkning har en minnesgräns på 1 GB per PySpark UDF. Om den här gränsen överskrids resulterar det i ett fel av typen UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT_SERVERLESS.
- Minnesgräns för standardåtkomstläge: PySpark-UDF:er i standardåtkomstläge har en minnesgräns baserat på det tillgängliga minnet för den valda instanstypen. Om det tillgängliga minnet överskrids resulterar det i ett fel av typen UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT.