Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Artikel finden Sie Beispiele für benutzerdefinierte Python-Funktionen (User-Defined Functions, UDFs). Darin wird gezeigt, wie Sie UDFs registrieren, wie Sie UDFs aufrufen und welche Einschränkungen in Bezug auf die Auswertungsreihenfolge von Teilausdrücken in Spark SQL bestehen.
In Databricks Runtime 14.0 und höher können Sie benutzerdefinierte Tabellenfunktionen (User-Defined Table Functions, UDTFs) von Python verwenden, um Funktionen zu registrieren, die ganze Beziehungen anstelle von Skalarwerten zurückgeben. Siehe Benutzerdefinierte Tabellenfunktionen (User-Defined Table Functions, UDTFs) in Python.
Hinweis
In Databricks Runtime 12.2 LTS und früher werden benutzerdefinierte Funktionen von Python und Pandas (UDFs) nicht in Unity Catalog auf Computeressourcen unterstützt, die den standardmäßigen Zugriffsmodus verwenden. Skalare Python UDFs und Pandas UDFs werden in Databricks Runtime 13.3 LTS und höher für alle Zugriffsmodi unterstützt.
In Databricks Runtime 13.3 LTS und höher können Sie skalare Python-UDFs mit SQL-Syntax bei Unity Catalog registrieren. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen (UDFs, user-defined functions) in Unity Catalog.
Registrieren einer Funktion als UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Optional können Sie den Rückgabetyp Ihrer UDF festlegen. Der Standardrückgabetyp ist StringType.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Aufrufen der UDF in Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Verwenden von UDF mit DataFrames
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Alternativ können Sie dieselbe UDF mithilfe der Anmerkungssyntax deklarieren:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Varianten mit UDF
Der PySpark-Typ für die Variante ist VariantType und die Werte sind vom Typ VariantVal. Informationen zu Varianten finden Sie unter Abfragevariantendaten.
from pyspark.sql.types import VariantType
# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
return VariantVal.parseJson(jsonString)
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
| {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
return {"v": VariantVal.parseJson(jsonString)}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
| {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
return [VariantVal.parseJson(jsonString)]
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
| [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
| toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+
Auswertungsreihenfolge und NULL-Überprüfung
Spark SQL (einschließlich SQL und der Datenrahmen- und Dataset-APIs) garantiert nicht die Reihenfolge der Auswertung von Teilausdrücken. Insbesondere werden die Eingaben eines Operators oder einer Funktion nicht zwangsläufig von links nach rechts oder in einer anderen festen Reihenfolge ausgewertet. Beispielsweise gilt für logische AND- und OR-Ausdrücke keine „Kurzschluss“-Semantik von links nach rechts.
Daher ist es gefährlich, sich auf die Nebeneffekte oder die Reihenfolge der Auswertung von booleschen Ausdrücken und die Reihenfolge von WHERE- und HAVING-Klauseln zu verlassen, da solche Ausdrücke und Klauseln während der Abfrageoptimierung und -planung neu angeordnet werden können. Insbesondere wenn eine UDF für die NULL-Überprüfung auf die Kurzschluss-Semantik in SQL angewiesen ist, gibt es keine Garantie, dass die NULL-Überprüfung vor dem Aufrufen der UDF erfolgt. Zum Beispiel
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Diese WHERE-Klausel garantiert nicht, dass die UDF strlen nach dem Herausfiltern von NULL-Werten aufgerufen wird.
Es wird empfohlen, eine der folgenden Aktionen auszuführen, um eine ordnungsgemäße NULL-Überprüfung durchzuführen:
- Machen Sie die UDF selbst nullfähig, und führen Sie die NULL-Überprüfung innerhalb der UDF selbst durch.
- Verwenden Sie
IF- oderCASE WHEN-Ausdrücke zum Durchführen der NULL-Überprüfung, und rufen Sie die UDF in einer bedingten Verzweigung auf.
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Dienstanmeldeinformationen in Scalar Python UDFs
Skalare Python UDFs können Unity Catalog Service-Anmeldeinformationen verwenden, um sicher auf externe Clouddienste zuzugreifen. Dies ist nützlich, um Vorgänge wie cloudbasierte Tokenisierung, Verschlüsselung oder geheime Verwaltung direkt in Ihre Datentransformationen zu integrieren.
Dienstanmeldeinformationen für skalare Python-UDFs werden nur für SQL Warehouse und allgemeine Compute unterstützt.
Hinweis
Dienstanmeldeinformationen in Scalar Python UDFs erfordern Databricks Runtime 17.1 und höher.
Informationen zum Erstellen einer Dienstanmeldeinformationen finden Sie unter Erstellen von Dienstanmeldeinformationen.
Hinweis
UDF-spezifische API für Dienstanmeldeinformationen:
Verwenden Sie databricks.service_credentials.getServiceCredentialsProvider() in UDFs den Zugriff auf Dienstanmeldeinformationen.
Dies unterscheidet sich von der dbutils.credentials.getServiceCredentialsProvider() in Notizbüchern verwendeten Funktion, die in UDF-Ausführungskontexten nicht verfügbar ist.
Um auf die Dienstanmeldeinformationen zuzugreifen, verwenden Sie das databricks.service_credentials.getServiceCredentialsProvider() Dienstprogramm in Ihrer UDF-Logik, um Cloud-SDKs mit den entsprechenden Anmeldeinformationen zu initialisieren. Der gesamte Code muss im UDF-Textkörper gekapselt werden.
@udf
def use_service_credential():
from azure.mgmt.web import WebSiteManagementClient
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
# Use web_client to perform operations
Berechtigungen für Dienstanmeldeinformationen
Der Ersteller der UDF muss über ZUGRIFFSberechtigungen für die Unity Catalog-Dienstanmeldeinformationen verfügen.
UDFs, die in No-PE Bereich ausgeführt werden, auch als dedizierte Cluster bezeichnet, erfordern MANAGE-Berechtigungen für die Dienstanmeldeinformationen.
Standardanmeldeinformationen
Bei Verwendung in Scalar Python UDFs verwendet Databricks automatisch die Standarddienstanmeldeinformationen aus der Computeumgebungsvariable. Mit diesem Verhalten können Sie sicher auf externe Dienste verweisen, ohne Anmeldeinformationsaliasen im UDF-Code explizit zu verwalten. Siehe Angeben einer Standarddienstanmeldeinformation für eine Computeressource
Standardmäßige Unterstützung für Anmeldeinformationen ist nur in Standard- und dedizierten Zugriffsmodusclustern verfügbar. Sie ist in DBSQL nicht verfügbar.
Sie müssen das azure-identity Paket installieren, um den DefaultAzureCredential Anbieter zu verwenden. Informationen zum Installieren des Pakets finden Sie unter "Python-Bibliotheken mit Notizbuchbereich" oder "Compute-Bereichsbibliotheken".
@udf
def use_service_credential():
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient
# DefaultAzureCredential is automatically using the default service credential for the compute
web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)
# Use web_client to perform operations
Aufgabenausführungskontext abrufen
Verwenden Sie die TaskContext PySpark-API, um Kontextinformationen wie Die Identität des Benutzers, Clustertags, Sparkauftrags-ID und vieles mehr abzurufen. Weitere Informationen finden Sie unter Abrufen des Aufgabenkontexts in einer UDF.
Begrenzungen
Die folgenden Einschränkungen gelten für PySpark UDFs:
Dateizugriffseinschränkungen: Auf Databricks Runtime 14.2 und darunter können PySpark UDFs auf freigegebenen Clustern nicht auf Git-Ordner, Arbeitsbereichsdateien oder Unity-Katalogvolumes zugreifen.
Übertragungsvariablen: PySpark UDFs auf Standardzugriffsmodusclustern und serverlosen Compute unterstützen keine Übertragungsvariablen.
Dienstanmeldeinformationen: Dienstanmeldeinformationen sind nur in Batch Unity Catalog Python UDFs und Scalar Python UDFs verfügbar. Sie werden in den Standard-Python-UDFs des Unity-Katalogs nicht unterstützt.
Dienstanmeldeinformationen: Dienstanmeldeinformationen sind nur bei serverloser Berechnung verfügbar, wenn Serverlose Umgebung Version 3 oder höher verwendet wird. Weitere Informationen finden Sie unter Versionen der serverlosen Umgebung.
- Speicherlimit auf serverlosen Servern: PySpark UDFs auf serverlosen Compute haben eine Speichergrenze von 1 GB pro PySpark UDF. Das Überschreiten dieses Grenzwerts führt zu einem Fehler vom Typ UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT_SERVERLESS.
- Speicherbeschränkung für den Standardzugriffsmodus: PySpark UDFs im Standardzugriffsmodus haben eine Speicherbeschränkung basierend auf dem verfügbaren Speicher des ausgewählten Instanztyps. Das Überschreiten des verfügbaren Arbeitsspeichers führt zu einem Fehler vom Typ UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT.