Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Nota
Artikel ini membahas Databricks Connect untuk Databricks Runtime 14.1 ke atas.
Databricks Connect for Scala mendukung menjalankan fungsi yang ditentukan pengguna (UDF) pada kluster Databricks dari lingkungan pengembangan lokal Anda.
Halaman ini menjelaskan cara menjalankan fungsi yang ditentukan pengguna dengan Databricks Connect for Scala.
Untuk versi Python artikel ini, lihat Fungsi yang ditentukan pengguna di Databricks Connect for Python.
Mengunggah kelas dan JAR yang dikompilasi
Agar UDF berfungsi, kelas yang dikompilasi dan JAR harus diunggah ke kluster menggunakan addCompiledArtifacts() API.
Nota
Scala yang digunakan oleh klien harus cocok dengan versi Scala pada kluster Azure Databricks. Untuk memeriksa versi Scala dari kluster tersebut, lihat pada bagian "Lingkungan Sistem" untuk versi Databricks Runtime kluster tersebut di versi dan kompatibilitas catatan rilis Databricks Runtime.
Program Scala berikut menyiapkan UDF sederhana yang mengkuadratkan nilai dalam kolom.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val spark = getSession()
val squared = udf((x: Long) => x * x)
spark.range(3)
.withColumn("squared", squared(col("id")))
.select("squared")
.show()
}
}
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
// On a Databricks cluster — reuse the active session
SparkSession.active
} else {
// Locally with Databricks Connect — upload local JARs and classes
DatabricksSession
.builder()
.addCompiledArtifacts(
Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
)
.getOrCreate()
}
}
}
Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI menunjuk ke lokasi yang sama dengan output yang dikompilasi proyek (misalnya, target/kelas atau JAR bawaan). Semua kelas yang dikompilasi diunggah ke Databricks, bukan hanya Main.
target/scala-2.13/classes/
├── com/
│ ├── examples/
│ │ ├── Main.class
│ │ └── MyUdfs.class
│ └── utils/
│ └── Helper.class
Ketika sesi Spark sudah diinisialisasi, kelas yang dikompilasi lebih lanjut dan JAR dapat diunggah menggunakan API spark.addArtifact().
Nota
Saat mengunggah JAR, semua JAR dependensi transitif harus disertakan. API tidak melakukan deteksi otomatis dependensi transitif.
UDF dengan dependensi pihak ketiga
Jika Anda telah menambahkan dependensi Maven dalam build.sbt yang digunakan dalam UDF tetapi tidak tersedia di kluster Databricks, misalnya:
// In build.sbt
libraryDependencies += "org.apache.commons" % "commons-text" % "1.10.0"
// In your code
import org.apache.commons.text.StringEscapeUtils
// ClassNotFoundException thrown during UDF execution of this function on the server side
val escapeUdf = udf((text: String) => {
StringEscapeUtils.escapeHtml4(text)
})
Gunakan spark.addArtifact() dengan ivy:// untuk mengunduh dependensi dari Maven:
Tambahkan
oropustaka ke filebuild.sbtAndalibraryDependencies ++= Seq( "org.apache.commons" % "commons-text" % "1.10.0" % Provided, "oro" % "oro" % "2.0.8" // Required for ivy:// to work )Tambahkan artefak setelah membuat sesi dengan
addArtifact()API:def getSession(): SparkSession = { if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) { SparkSession.active } else { val spark = DatabricksSession.builder() .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI) .getOrCreate() // Convert Maven coordinates to ivy:// format // From: "org.apache.commons" % "commons-text" % "1.10.0" // To: ivy://org.apache.commons:commons-text:1.10.0 spark.addArtifact("ivy://org.apache.commons:commons-text:1.10.0") spark } }
API Himpunan Data yang Ditik
API Himpunan Data yang Ditik memungkinkan seseorang menjalankan transformasi seperti map(), , filter()mapPartitions(), dan agregasi pada himpunan data yang dihasilkan. Mengunggah kelas yang dikompilasi dan JAR ke kluster menggunakan addCompiledArtifacts() API juga berlaku untuk ini, sehingga kode Anda harus berperilaku berbeda tergantung pada tempatnya berjalan:
- Pengembangan lokal dengan Databricks Connect: Unggah artefak ke kluster jarak jauh.
- Disebarkan pada Databricks yang berjalan pada kluster: Tidak perlu mengunggah apa pun karena kelas sudah ada.
Aplikasi Scala berikut menggunakan API map() untuk memodifikasi angka dalam kolom hasil menjadi string awalan.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
Dependensi eksternal JAR
Jika Anda menggunakan pustaka privat atau pihak ketiga yang tidak ada di kluster:
import com.mycompany.privatelib.DataProcessor
// ClassNotFoundException thrown during UDF execution of this function on the server side
val myUdf = udf((data: String) => {
DataProcessor.process(data)
})
Unggah JAR eksternal dari folder Anda lib/ saat membuat sesi:
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
SparkSession.active
} else {
val builder = DatabricksSession.builder()
.addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
// Add all JARs from lib/ folder
val libFolder = new java.io.File("lib")
builder.addCompiledArtifacts(libFolder.toURI)
builder.getOrCreate()
}
}
Ini secara otomatis mengunggah semua JAR di direktori lib/ Anda ke Databricks saat berjalan secara lokal.
Proyek dengan beberapa modul
Dalam proyek SBT multi-modul, getClass.getProtectionDomain.getCodeSource.getLocation.toURI hanya mengembalikan lokasi modul saat ini. Jika UDF Anda menggunakan kelas dari modul lain, Anda akan mendapatkan ClassNotFoundException.
my-project/
├── module-a/ (main application)
├── module-b/ (utilities - module-a depends on this)
Gunakan getClass dari kelas di setiap modul untuk mendapatkan semua lokasinya dan mengunggahnya secara terpisah:
// In module-a/src/main/scala/Main.scala
import com.company.moduleb.DataProcessor // From module-b
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
SparkSession.active
} else {
// Get location using a class FROM module-a
val moduleALocation = Main.getClass
.getProtectionDomain.getCodeSource.getLocation.toURI
// Get location using a class FROM module-b
val moduleBLocation = DataProcessor.getClass
.getProtectionDomain.getCodeSource.getLocation.toURI
DatabricksSession.builder()
.addCompiledArtifacts(moduleALocation) // Upload module-a
.addCompiledArtifacts(moduleBLocation) // Upload module-b
.getOrCreate()
}
}
Keterbatasan
- Dukungan untuk UDF pada komputasi tanpa server selalu mengikuti rilis minor awal yang sesuai dari Databricks Connect. Untuk versi yang didukung, lihat tabel kompatibilitas versi.