Bagikan melalui


Fungsi yang ditentukan pengguna di Databricks Connect for Scala

Nota

Artikel ini membahas Databricks Connect untuk Databricks Runtime 14.1 ke atas.

Databricks Connect for Scala mendukung menjalankan fungsi yang ditentukan pengguna (UDF) pada kluster Databricks dari lingkungan pengembangan lokal Anda.

Halaman ini menjelaskan cara menjalankan fungsi yang ditentukan pengguna dengan Databricks Connect for Scala.

Untuk versi Python artikel ini, lihat Fungsi yang ditentukan pengguna di Databricks Connect for Python.

Mengunggah kelas dan JAR yang dikompilasi

Agar UDF berfungsi, kelas yang dikompilasi dan JAR harus diunggah ke kluster menggunakan addCompiledArtifacts() API.

Nota

Scala yang digunakan oleh klien harus cocok dengan versi Scala pada kluster Azure Databricks. Untuk memeriksa versi Scala dari kluster tersebut, lihat pada bagian "Lingkungan Sistem" untuk versi Databricks Runtime kluster tersebut di versi dan kompatibilitas catatan rilis Databricks Runtime.

Program Scala berikut menyiapkan UDF sederhana yang mengkuadratkan nilai dalam kolom.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val spark = getSession()

    val squared = udf((x: Long) => x * x)

    spark.range(3)
      .withColumn("squared", squared(col("id")))
      .select("squared")
      .show()

    }
  }

  def getSession(): SparkSession = {
    if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
      // On a Databricks cluster — reuse the active session
      SparkSession.active
    } else {
      // Locally with Databricks Connect — upload local JARs and classes
      DatabricksSession
        .builder()
        .addCompiledArtifacts(
          Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
        )
        .getOrCreate()
    }
  }
}

Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI menunjuk ke lokasi yang sama dengan output yang dikompilasi proyek (misalnya, target/kelas atau JAR bawaan). Semua kelas yang dikompilasi diunggah ke Databricks, bukan hanya Main.

target/scala-2.13/classes/
├── com/
│   ├── examples/
│   │   ├── Main.class
│   │   └── MyUdfs.class
│   └── utils/
│       └── Helper.class

Ketika sesi Spark sudah diinisialisasi, kelas yang dikompilasi lebih lanjut dan JAR dapat diunggah menggunakan API spark.addArtifact().

Nota

Saat mengunggah JAR, semua JAR dependensi transitif harus disertakan. API tidak melakukan deteksi otomatis dependensi transitif.

UDF dengan dependensi pihak ketiga

Jika Anda telah menambahkan dependensi Maven dalam build.sbt yang digunakan dalam UDF tetapi tidak tersedia di kluster Databricks, misalnya:

// In build.sbt
libraryDependencies += "org.apache.commons" % "commons-text" % "1.10.0"
// In your code
import org.apache.commons.text.StringEscapeUtils

// ClassNotFoundException thrown during UDF execution of this function on the server side
val escapeUdf = udf((text: String) => {
  StringEscapeUtils.escapeHtml4(text)
})

Gunakan spark.addArtifact() dengan ivy:// untuk mengunduh dependensi dari Maven:

  1. Tambahkan oro pustaka ke file build.sbt Anda

    libraryDependencies ++= Seq(
      "org.apache.commons" % "commons-text" % "1.10.0" % Provided,
      "oro" % "oro" % "2.0.8"  // Required for ivy:// to work
    )
    
  2. Tambahkan artefak setelah membuat sesi dengan addArtifact() API:

    def getSession(): SparkSession = {
      if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
        SparkSession.active
      } else {
        val spark = DatabricksSession.builder()
          .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
          .getOrCreate()
    
        // Convert Maven coordinates to ivy:// format
        // From: "org.apache.commons" % "commons-text" % "1.10.0"
        // To:   ivy://org.apache.commons:commons-text:1.10.0
        spark.addArtifact("ivy://org.apache.commons:commons-text:1.10.0")
    
        spark
      }
    }
    

API Himpunan Data yang Ditik

API Himpunan Data yang Ditik memungkinkan seseorang menjalankan transformasi seperti map(), , filter()mapPartitions(), dan agregasi pada himpunan data yang dihasilkan. Mengunggah kelas yang dikompilasi dan JAR ke kluster menggunakan addCompiledArtifacts() API juga berlaku untuk ini, sehingga kode Anda harus berperilaku berbeda tergantung pada tempatnya berjalan:

  • Pengembangan lokal dengan Databricks Connect: Unggah artefak ke kluster jarak jauh.
  • Disebarkan pada Databricks yang berjalan pada kluster: Tidak perlu mengunggah apa pun karena kelas sudah ada.

Aplikasi Scala berikut menggunakan API map() untuk memodifikasi angka dalam kolom hasil menjadi string awalan.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Dependensi eksternal JAR

Jika Anda menggunakan pustaka privat atau pihak ketiga yang tidak ada di kluster:

import com.mycompany.privatelib.DataProcessor

// ClassNotFoundException thrown during UDF execution of this function on the server side
val myUdf = udf((data: String) => {
  DataProcessor.process(data)
})

Unggah JAR eksternal dari folder Anda lib/ saat membuat sesi:

def getSession(): SparkSession = {
  if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
  } else {
    val builder = DatabricksSession.builder()
      .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)

     // Add all JARs from lib/ folder
     val libFolder = new java.io.File("lib")
     builder.addCompiledArtifacts(libFolder.toURI)


   builder.getOrCreate()
  }
}

Ini secara otomatis mengunggah semua JAR di direktori lib/ Anda ke Databricks saat berjalan secara lokal.

Proyek dengan beberapa modul

Dalam proyek SBT multi-modul, getClass.getProtectionDomain.getCodeSource.getLocation.toURI hanya mengembalikan lokasi modul saat ini. Jika UDF Anda menggunakan kelas dari modul lain, Anda akan mendapatkan ClassNotFoundException.

my-project/
├── module-a/  (main application)
├── module-b/  (utilities - module-a depends on this)

Gunakan getClass dari kelas di setiap modul untuk mendapatkan semua lokasinya dan mengunggahnya secara terpisah:

// In module-a/src/main/scala/Main.scala
import com.company.moduleb.DataProcessor  // From module-b

def getSession(): SparkSession = {
  if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
  } else {
    // Get location using a class FROM module-a
    val moduleALocation = Main.getClass
      .getProtectionDomain.getCodeSource.getLocation.toURI

    // Get location using a class FROM module-b
    val moduleBLocation = DataProcessor.getClass
      .getProtectionDomain.getCodeSource.getLocation.toURI

    DatabricksSession.builder()
      .addCompiledArtifacts(moduleALocation)  // Upload module-a
      .addCompiledArtifacts(moduleBLocation)  // Upload module-b
      .getOrCreate()
  }
}

Keterbatasan

  • Dukungan untuk UDF pada komputasi tanpa server selalu mengikuti rilis minor awal yang sesuai dari Databricks Connect. Untuk versi yang didukung, lihat tabel kompatibilitas versi.