Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Artikel finden Sie Beispiele für benutzerdefinierte Scala-Funktionen (User-Defined Functions, UDFs). Darin wird gezeigt, wie Sie UDFs registrieren, wie Sie UDFs aufrufen und welche Einschränkungen in Bezug auf die Auswertungsreihenfolge von Teilausdrücken in Spark SQL bestehen. Weitere Informationen finden Sie unter Externe benutzerdefinierte skalare Funktionen (UDFs, user-defined scalar functions).
Hinweis
Scala UDFs auf Unity Catalog-fähigen Computeressourcen mit Standardzugriffsmodus (vormals gemeinsam genutzter Zugriff) erfordert Databricks Runtime 14.2 und höher.
Registrieren einer Funktion als UDF
val squared = (s: Long) => {
s * s
}
spark.udf.register("square", squared)
Aufrufen der UDF in Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test
Verwenden von UDFs mit Datenrahmen
import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))
Auswertungsreihenfolge und NULL-Überprüfung
Spark SQL (einschließlich SQL und der Datenrahmen- und Dataset-APIs) garantiert nicht die Reihenfolge der Auswertung von Teilausdrücken. Insbesondere werden die Eingaben eines Operators oder einer Funktion nicht zwangsläufig von links nach rechts oder in einer anderen festen Reihenfolge ausgewertet. Beispielsweise gilt für logische AND
- und OR
-Ausdrücke keine „Kurzschluss“-Semantik von links nach rechts.
Daher ist es gefährlich, sich auf die Nebeneffekte oder die Reihenfolge der Auswertung von booleschen Ausdrücken und die Reihenfolge von WHERE
- und HAVING
-Klauseln zu verlassen, da solche Ausdrücke und Klauseln während der Abfrageoptimierung und -planung neu angeordnet werden können. Wenn eine UDF insbesondere auf Kurzschlusssemantik in SQL für die Nullüberprüfung angewiesen ist, besteht keine Garantie dafür, dass die NULL-Überprüfung erfolgt, bevor sie die UDF aufruft. Ein auf ein Objekt angewendeter
spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee
Diese WHERE
-Klausel garantiert nicht, dass die UDF strlen
nach dem Herausfiltern von NULL-Werten aufgerufen wird.
Es wird empfohlen, eine der folgenden Aktionen auszuführen, um eine ordnungsgemäße NULL-Überprüfung durchzuführen:
- Machen Sie die UDF selbst nullfähig, und führen Sie die NULL-Überprüfung innerhalb der UDF selbst durch.
- Verwenden Sie
IF
- oderCASE WHEN
-Ausdrücke zum Durchführen der NULL-Überprüfung, und rufen Sie die UDF in einer bedingten Verzweigung auf.
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Typisierte Dataset-APIs
Hinweis
Dieses Feature wird für Unity Catalog-fähige Cluster mit Standardzugriffsmodus in Databricks Runtime 15.4 und höher unterstützt.
Typisierte Dataset-APIs ermöglichen es Ihnen, Transformationen wie Map, Filter und Aggregationen auf resultierenden Datasets mit einer benutzerdefinierten Funktion auszuführen.
Beispielsweise verwendet die folgende Scala-Anwendung die map()
-API, um eine Zahl in einer Ergebnisspalte in eine vorangestellte Zeichenfolge zu ändern.
spark.range(3).map(f => s"row-$f").show()
Während in diesem Beispiel die map()
-API verwendet wird, gilt dies auch für andere typisierte Dataset-APIs wie filter()
, mapPartitions()
, foreach()
, foreachPartition()
, reduce()
und flatMap()
.
Scala UDF-Features und Databricks-Runtime-Kompatibilität
Die folgenden Scala-Features erfordern mindeste Databricks-Runtime-Versionen, wenn sie in Unity Catalog-aktivierten Clustern im Standardzugriffsmodus (gemeinsam genutzt) verwendet werden.
Merkmal | Minimimum Databricks Runtime-Version |
---|---|
Benutzerdefinierte Skalarfunktionen | Databricks Runtime 14.2 |
Dataset.map , , Dataset.mapPartitions Dataset.filter , , Dataset.reduce Dataset.flatMap |
Databricks Runtime 15.4 |
KeyValueGroupedDataset.flatMapGroups , KeyValueGroupedDataset.mapGroups |
Databricks Runtime 15.4 |
(Streaming) foreachWriter Sink |
Databricks Runtime 15.4 |
(Streaming) foreachBatch |
Databricks Runtime 16.1 |
(Streaming) KeyValueGroupedDataset.flatMapGroupsWithState |
Databricks Runtime 16.2 |