Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Poznámka
Tento článek popisuje Databricks Connect pro Databricks Runtime 14.1 a novější.
Databricks Connect pro Scala podporuje spouštění uživatelem definovaných funkcí (UDF) v clusterech Databricks z místního vývojového prostředí.
Tato stránka popisuje, jak spouštět uživatelem definované funkce pomocí Databricks Connect pro Scala.
Python verzi tohoto článku najdete v Uživatelsky definované funkce v Databricks Connect pro Python.
Nahrání zkompilované třídy a souborů JAR
Aby funkce definované uživatelem fungovaly, kompilované třídy a JARy se musí nahrát do clusteru pomocí rozhraní API addCompiledArtifacts().
Poznámka
Scala používaná klientem musí odpovídat verzi Scala v clusteru Azure Databricks. Pokud chcete zkontrolovat verzi Scaly pro cluster, přečtěte si část Systémové prostředí pro verzi Databricks Runtime clusteru ve verzích a kompatibilitě s poznámkami k vydání Databricks Runtime.
Následující program v jazyce Scala nastaví jednoduchou uživatelskou definovanou funkci, která ve sloupci umocní hodnoty.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val spark = getSession()
val squared = udf((x: Long) => x * x)
spark.range(3)
.withColumn("squared", squared(col("id")))
.select("squared")
.show()
}
}
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
// On a Databricks cluster — reuse the active session
SparkSession.active
} else {
// Locally with Databricks Connect — upload local JARs and classes
DatabricksSession
.builder()
.addCompiledArtifacts(
Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
)
.getOrCreate()
}
}
}
Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI odkazuje na stejné umístění jako zkompilovaný výstup projektu (například cíl/třídy nebo sestavený soubor JAR). Všechny kompilované třídy se nahrávají do Databricks, nejen Main.
target/scala-2.13/classes/
├── com/
│ ├── examples/
│ │ ├── Main.class
│ │ └── MyUdfs.class
│ └── utils/
│ └── Helper.class
Pokud je relace Sparku již inicializována, je možné pomocí rozhraní API spark.addArtifact() nahrát další kompilované třídy a jary.
Poznámka
Při nahrávání JAR souborů je nutné zahrnout všechny přidružené tranzitivní závislosti JARů. Rozhraní API neprovádějí automatickou detekci tranzitivních závislostí.
UDF se závislostmi třetích stran
Pokud jste přidali závislost Mavenu, která se používá v build.sbt UDF, ale není dostupná v clusteru Databricks, například:
// In build.sbt
libraryDependencies += "org.apache.commons" % "commons-text" % "1.10.0"
// In your code
import org.apache.commons.text.StringEscapeUtils
// ClassNotFoundException thrown during UDF execution of this function on the server side
val escapeUdf = udf((text: String) => {
StringEscapeUtils.escapeHtml4(text)
})
Použijte spark.addArtifact() s ivy:// ke stažení závislostí z Maven.
Přidejte knihovnu do
orosouborulibraryDependencies ++= Seq( "org.apache.commons" % "commons-text" % "1.10.0" % Provided, "oro" % "oro" % "2.0.8" // Required for ivy:// to work )Po vytvoření relace pomocí rozhraní API
addArtifact()přidejte artefakt:def getSession(): SparkSession = { if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) { SparkSession.active } else { val spark = DatabricksSession.builder() .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI) .getOrCreate() // Convert Maven coordinates to ivy:// format // From: "org.apache.commons" % "commons-text" % "1.10.0" // To: ivy://org.apache.commons:commons-text:1.10.0 spark.addArtifact("ivy://org.apache.commons:commons-text:1.10.0") spark } }
API s typováním datové sady
Rozhraní API pro typované datové sady umožňují spouštět transformace, jako jsou map(), filter() a mapPartitions(), a agregace na výsledných datových sadách. Nahrání zkompilované třídy a jars do clusteru pomocí addCompiledArtifacts() rozhraní API platí také pro tyto objekty, takže se váš kód musí chovat jinak v závislosti na tom, kde běží:
- Místní vývoj pomocí Databricks Connect: Nahrajte artefakty do vzdáleného clusteru.
- Nasazené v Databricks spuštěném v clusteru: Nemusíte nic nahrávat, protože třídy už tam jsou.
Následující aplikace Scala používá rozhraní API map() k úpravě čísla ve výsledkovém sloupci na řetězec s předponou.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
Externí závislosti JAR
Pokud používáte privátní knihovnu nebo knihovnu třetí strany, která není v clusteru:
import com.mycompany.privatelib.DataProcessor
// ClassNotFoundException thrown during UDF execution of this function on the server side
val myUdf = udf((data: String) => {
DataProcessor.process(data)
})
Při vytváření relace nahrajte externí JAR soubory z vaší složky lib/.
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
SparkSession.active
} else {
val builder = DatabricksSession.builder()
.addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
// Add all JARs from lib/ folder
val libFolder = new java.io.File("lib")
builder.addCompiledArtifacts(libFolder.toURI)
builder.getOrCreate()
}
}
Při místním spuštění se do Databricks automaticky nahrají všechny soubory JAR ve vašem adresáři lib.
Projekty s více moduly
V projektu SBT s více moduly, getClass.getProtectionDomain.getCodeSource.getLocation.toURI vrátí pouze umístění aktuálního modulu. Pokud vaše funkce definovaná uživatelem používá třídy z jiných modulů, získáte ClassNotFoundException.
my-project/
├── module-a/ (main application)
├── module-b/ (utilities - module-a depends on this)
Pomocí getClass třídy v každém modulu získejte všechny jejich umístění a nahrajte je samostatně:
// In module-a/src/main/scala/Main.scala
import com.company.moduleb.DataProcessor // From module-b
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
SparkSession.active
} else {
// Get location using a class FROM module-a
val moduleALocation = Main.getClass
.getProtectionDomain.getCodeSource.getLocation.toURI
// Get location using a class FROM module-b
val moduleBLocation = DataProcessor.getClass
.getProtectionDomain.getCodeSource.getLocation.toURI
DatabricksSession.builder()
.addCompiledArtifacts(moduleALocation) // Upload module-a
.addCompiledArtifacts(moduleBLocation) // Upload module-b
.getOrCreate()
}
}
Omezení
- Podpora UDF na bezserverových výpočtech je vždy zahájena po odpovídajícím počátečním dílčím vydání Databricks Connect. Podporované verze najdete v tabulce kompatibility verzí.