Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Hinweis
Dieser Artikel behandelt Databricks Connect für Databricks Runtime 14.1 und höher.
In diesem Artikel wird beschrieben, wie benutzerdefinierte Funktionen mit Databricks Connect für Scala ausgeführt werden. Mit Databricks Connect können Sie beliebte IDEs, Notebookserver und benutzerdefinierte Anwendungen mit Azure Databricks-Clustern verbinden. Die Python-Version dieses Artikels finden Sie unter Benutzerdefinierte Funktionen in Databricks Connect für Python.
Hinweis
Bevor Sie beginnen, Databricks Connect zu verwenden, müssen Sie den Databricks Connect-Client einrichten.
Für Databricks Runtime 14.1 und höher unterstützt Databricks Connect für Scala das Ausführen von benutzerdefinierten Funktionen (UDFs).
Um eine UDF auszuführen, müssen die kompilierten Klassen und JARs, die die UDF benötigt, in den Cluster hochgeladen werden.
Die addCompiledArtifacts()
API kann verwendet werden, um die kompilierten Klassen- und JAR-Dateien anzugeben, die hochgeladen werden müssen.
Hinweis
Die vom Client verwendete Skala muss mit der Scala-Version im Azure Databricks-Cluster übereinstimmen. Informationen zum Überprüfen der Scala-Version des Clusters finden Sie im Abschnitt "Systemumgebung" für die Databricks-Runtime-Version des Clusters in den Versionen und Kompatibilitätsinformationen der Databricks Runtime.
Das folgende Scala-Programm richtet eine einfache UDF ein, die Werte in einer Spalte quadriert.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
Im vorherigen Beispiel wird nur das kompilierte Artefakt Main
hinzugefügt, da die UDF vollständig innerhalb von Main
enthalten ist.
Wenn sich die UDF über andere Klassen verteilt oder externe Bibliotheken (d. h. JARs) verwendet, sollten auch alle diese Bibliotheken einbezogen werden.
Wenn die Spark-Sitzung bereits initialisiert ist, können weitere kompilierte Klassen und JARs mithilfe der spark.addArtifact()
API hochgeladen werden.
Hinweis
Beim Hochladen von JARs müssen alle transitiven Abhängigkeits-JARs für den Upload eingeschlossen werden. Die APIs führen keine automatische Erkennung transitiver Abhängigkeiten durch.
Typisierte Dataset-APIs
Der gleiche Mechanismus, der im vorherigen Abschnitt für UDFs beschrieben wird, gilt auch für typisierte Dataset-APIs.
Typisierte Dataset-APIs ermöglichen eine Ausführung von Transformationen wie Zuordnung, Filter und Aggregationen für resultierende Datasets. Diese werden auch ähnlich wie UDFs im Databricks-Cluster ausgeführt.
Die folgende Scala-Anwendung verwendet die map()
API, um eine Zahl in einer Ergebnisspalte in eine vorangestellte Zeichenfolge zu ändern.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
Während in diesem Beispiel die map()
API verwendet wird, gilt dies auch für andere typisierte Dataset-APIs wie filter()
, usw mapPartitions()
.