Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Dit artikel bevat voorbeelden van door de gebruiker gedefinieerde Python-functie (UDF). Het laat zien hoe u UDF's registreert, UDF's aanroept en u een waarschuwing geeft over de evaluatievolgorde van subexpressies in Spark SQL.
In Databricks Runtime 14.0 en hoger kunt u door de gebruiker gedefinieerde tabelfuncties (UDDF's) van Python gebruiken om functies te registreren die volledige relaties retourneren in plaats van scalaire waarden. Zie door de gebruiker gedefinieerde tabelfuncties (UDDF's) van Python.
Notitie
In Databricks Runtime 12.2 LTS en lager worden Python UDF's en Pandas UDF's niet ondersteund op Unity Catalog-rekenkracht die gebruikmaakt van de standaardtoegangsmodus. Scalaire Python UDF's en Pandas UDF's worden ondersteund in Databricks Runtime 13.3 LTS en hoger voor alle toegangsmodi.
In Databricks Runtime 13.3 LTS en hoger kunt u scalaire Python UDF's registreren bij Unity Catalog met behulp van SQL-syntaxis. Zie door de gebruiker gedefinieerde functies (UDF's) in Unity Catalog.
Een functie registreren als een UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
U kunt desgewenst het retourtype van uw UDF instellen. Het standaard retourtype is StringType.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
De UDF aanroepen in Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
UDF gebruiken met DataFrames
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
U kunt ook dezelfde UDF declareren met behulp van de aantekeningsyntaxis:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Varianten met UDF
Het Type PySpark voor variant is VariantType en de waarden zijn van het type VariantVal. Zie Queryvariantgegevens voor meer informatie over varianten.
from pyspark.sql.types import VariantType
# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
return VariantVal.parseJson(jsonString)
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
| {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
return {"v": VariantVal.parseJson(jsonString)}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
| {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
return [VariantVal.parseJson(jsonString)]
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
| [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
| toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+
Evaluatievolgorde en nullcontrole
Spark SQL (inclusief SQL en de DataFrame- en Gegevensset-API) garandeert niet de volgorde van evaluatie van subexpressies. Met name worden de invoer van een operator of functie niet noodzakelijkerwijs van links naar rechts of in een andere vaste volgorde geƫvalueerd. Logische AND en OR expressies hebben geen links-naar-rechts 'kortsluiting' semantiek.
Daarom is het gevaarlijk om te vertrouwen op de bijwerkingen of de volgorde van evaluatie van Boole-expressies, en de volgorde van WHERE en HAVING componenten, omdat dergelijke expressies en componenten tijdens queryoptimalisatie en -planning opnieuw kunnen worden gerangschikt. Als een UDF afhankelijk is van kortsluitingssemantiek in SQL voor null-controle, is er geen garantie dat de null-controle plaatsvindt voordat de UDF wordt aanroepen. Bijvoorbeeld:
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Deze WHERE component garandeert niet dat de strlen UDF wordt aangeroepen nadat null-waarden zijn uitgefilterd.
Als u de juiste null-controle wilt uitvoeren, raden we u aan een van de volgende handelingen uit te voeren:
- De UDF zelf null-aware maken en null-controle uitvoeren in de UDF zelf
- Gebruik
IFofCASE WHENexpressies om de null-controle uit te voeren en de UDF aan te roepen in een voorwaardelijke vertakking
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Servicereferenties in Scalar Python UDF's
Scalar Python UDF's kunnen referenties van de Unity Catalog-service gebruiken om veilig toegang te krijgen tot externe cloudservices. Dit is handig voor het integreren van bewerkingen zoals cloudtokenisatie, versleuteling of geheimbeheer rechtstreeks in uw gegevenstransformaties.
Servicereferenties voor scalaire Python UDF's worden alleen ondersteund in SQL Warehouse en algemene rekenkracht.
Notitie
Servicereferenties in Scalar Python UDF's vereisen Databricks Runtime 17.1 en hoger.
Zie Servicereferenties maken om een servicereferentie te creƫren.
Notitie
UDF-specifieke API voor servicereferenties:
Gebruik in UDF's databricks.service_credentials.getServiceCredentialsProvider() om toegang te krijgen tot servicereferenties.
Dit verschilt van de dbutils.credentials.getServiceCredentialsProvider() functie die wordt gebruikt in notebooks, die niet beschikbaar is in UDF-uitvoeringscontexten.
Als u toegang wilt krijgen tot de servicereferentie, gebruikt u het databricks.service_credentials.getServiceCredentialsProvider() hulpprogramma in uw UDF-logica om cloud-SDK's te initialiseren met de juiste referentie. Alle code moet worden ingekapseld in de UDF-hoofdtekst.
@udf
def use_service_credential():
from azure.mgmt.web import WebSiteManagementClient
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
# Use web_client to perform operations
Machtigingen voor service-inloggegevens
De maker van de UDF moet toegangsmachtigingen hebben voor de Unity Catalog-servicereferenties.
UDF's die worden uitgevoerd in No-PE bereik, ook wel toegewezen clusters genoemd, vereisen MANAGE-machtigingen voor de servicereferenties.
Standaardreferenties
Wanneer Databricks wordt gebruikt in Scalar Python UDF's, wordt automatisch de standaardservicereferentie uit de rekenomgevingsvariabele gebruikt. Met dit gedrag kunt u veilig verwijzen naar externe services zonder expliciet referenties in uw UDF-code te beheren. Zie Een standaardservicereferentie opgeven voor een rekenresource
Standaardreferentieondersteuning is alleen beschikbaar in clusters met de standaard- en toegewezen toegangsmodus. Het is niet beschikbaar in DBSQL.
U moet het azure-identity pakket installeren om de DefaultAzureCredential provider te kunnen gebruiken. Als u het pakket wilt installeren, raadpleegt u Python-bibliotheken met notebookbereik of compute-scoped bibliotheken.
@udf
def use_service_credential():
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient
# DefaultAzureCredential is automatically using the default service credential for the compute
web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)
# Use web_client to perform operations
Context voor taakuitvoering ophalen
Gebruik de TaskContext PySpark-API om contextinformatie op te halen, zoals de identiteit van de gebruiker, clustertags, spark-taak-id en meer. Zie Taakcontext ophalen in een UDF.
Beperkingen
De volgende beperkingen gelden voor PySpark UDF's:
Beperkingen voor bestandstoegang: In Databricks Runtime 14.2 en lager hebben PySpark UDF's op gedeelde clusters geen toegang tot Git-mappen, werkruimtebestanden of Unity Catalog-volumes.
Broadcast-variabelen: PySpark UDF's op clusters in de standaardtoegangsmodus en serverloze rekenkracht bieden geen ondersteuning voor broadcastvariabelen.
Servicereferenties: Servicereferenties zijn alleen beschikbaar in Batch Unity Catalog Python UDF's en Scalar Python UDF's. Ze worden niet ondersteund in standaard-Unity Catalog Python UDF's.
Servicereferenties: servicereferenties zijn alleen beschikbaar in serverloze berekeningen wanneer u serverloze omgevingsversie 3 of hoger gebruikt. Zie Serverloze Omgevingsversies.
- Geheugenlimiet op serverloze: PySpark UDF's op serverloze berekeningen hebben een geheugenlimiet van 1 GB per PySpark UDF. Het overschrijden van deze limiet resulteert in een fout van het type UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT_SERVERLESS.
- Geheugenlimiet voor de standaardtoegangsmodus: PySpark UDF's in de standaardtoegangsmodus hebben een geheugenlimiet op basis van het beschikbare geheugen van het gekozen exemplaartype. Het overschrijden van het beschikbare geheugen resulteert in een fout van het type UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT.