Scala için Databricks Connect'te kullanıcı tanımlı işlevler

Not

Bu makale Databricks Runtime 14.1 ve üzeri için Databricks Connect'i kapsar.

Scala için Databricks Connect , yerel geliştirme ortamınızdan Databricks kümelerinde kullanıcı tanımlı işlevlerin (UDF) çalıştırılmasını destekler.

Bu sayfada Scala için Databricks Connect ile kullanıcı tanımlı işlevlerin nasıl yürütüldiği açıklanır.

Bu makalenin Python sürümü için bkz. Pythoniçin Databricks Connect'te kullanıcı tanımlı işlevler .

Derlenmiş sınıfları ve JAR dosyalarını yükle

UDF'lerin çalışması için derlenen sınıfların ve JAR'lerin API kullanılarak addCompiledArtifacts() kümeye yüklenmesi gerekir.

Not

İstemci tarafından kullanılan Scala, Azure Databricks kümesindeki Scala sürümüyle eşleşmelidir. Kümenin Scala sürümünü kontrol etmek için, "Databricks Runtime sürüm notları ve uyumluluk" kısmında, kümenin Databricks Runtime sürümünün "Sistem Ortamı" bölümüne bakın.

Aşağıdaki Scala programı, bir sütundaki değerlerin karesini alan basit bir UDF kurar.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val spark = getSession()

    val squared = udf((x: Long) => x * x)

    spark.range(3)
      .withColumn("squared", squared(col("id")))
      .select("squared")
      .show()

    }
  }

  def getSession(): SparkSession = {
    if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
      // On a Databricks cluster — reuse the active session
      SparkSession.active
    } else {
      // Locally with Databricks Connect — upload local JARs and classes
      DatabricksSession
        .builder()
        .addCompiledArtifacts(
          Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
        )
        .getOrCreate()
    }
  }
}

Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI projenin derlenmiş çıkışıyla aynı konumu (örneğin, hedef/sınıflar veya yerleşik JAR) gösterir. Tüm derlenmiş sınıflar yalnızca Maindeğil Databricks'e yüklenir.

target/scala-2.13/classes/
├── com/
│   ├── examples/
│   │   ├── Main.class
│   │   └── MyUdfs.class
│   └── utils/
│       └── Helper.class

Spark oturumu zaten başlatılmış olduğunda, ek derlenmiş sınıflar ve JAR dosyaları spark.addArtifact() API kullanılarak yüklenebilir.

Not

JAR dosyalarını yüklerken, tüm geçişli bağımlılık JAR dosyaları yüklemeye dahil edilmelidir. API'ler geçişli bağımlılıkları otomatik olarak algılamaz.

Üçüncü taraf bağımlılıkları olan Kullanıcı Tanımlı Fonksiyonlar (UDF'ler)

UDF'de build.sbt kullanılan ancak Databricks kümesinde bulunmayan bir Maven bağımlılığı eklediyseniz, örneğin:

// In build.sbt
libraryDependencies += "org.apache.commons" % "commons-text" % "1.10.0"
// In your code
import org.apache.commons.text.StringEscapeUtils

// ClassNotFoundException thrown during UDF execution of this function on the server side
val escapeUdf = udf((text: String) => {
  StringEscapeUtils.escapeHtml4(text)
})

spark.addArtifact() ile ivy:// kullanarak Maven'dan bağımlılıkları indirin.

  1. Dosyanıza oro kitaplığını build.sbt ekleyin

    libraryDependencies ++= Seq(
      "org.apache.commons" % "commons-text" % "1.10.0" % Provided,
      "oro" % "oro" % "2.0.8"  // Required for ivy:// to work
    )
    
  2. API ile addArtifact() oturum oluşturduktan sonra yapıtı ekleyin:

    def getSession(): SparkSession = {
      if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
        SparkSession.active
      } else {
        val spark = DatabricksSession.builder()
          .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
          .getOrCreate()
    
        // Convert Maven coordinates to ivy:// format
        // From: "org.apache.commons" % "commons-text" % "1.10.0"
        // To:   ivy://org.apache.commons:commons-text:1.10.0
        spark.addArtifact("ivy://org.apache.commons:commons-text:1.10.0")
    
        spark
      }
    }
    

Yazılan Veri Kümesi API'leri

Türlü Veri Kümesi API'leri, sonuçta elde edilen veri kümelerinde map(), filter(), mapPartitions() gibi dönüştürmeleri ve kümelemeleri gerçekleştirmeye olanak sağlar. DERlenmiş sınıfı ve JAR'leri API kullanarak addCompiledArtifacts() kümeye yüklemek de bunlar için geçerlidir, bu nedenle kodunuzun çalıştığı yere bağlı olarak farklı davranması gerekir:

  • Databricks Connect ile yerel geliştirme: Yapıtları uzak kümeye yükleyin.
  • Kümede çalışan Databricks'te dağıtıldı: Sınıflar zaten orada olduğundan herhangi bir şeyi karşıya yüklemeniz gerekmez.

Aşağıdaki Scala uygulaması, sonuç sütunundaki bir sayıyı ön ekli dize olarak değiştirmek için map() API'sini kullanır.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Dış JAR bağımlılıkları

Kümede olmayan bir özel veya üçüncü taraf kitaplığı kullanıyorsanız:

import com.mycompany.privatelib.DataProcessor

// ClassNotFoundException thrown during UDF execution of this function on the server side
val myUdf = udf((data: String) => {
  DataProcessor.process(data)
})

Oturumu oluştururken klasörünüzden lib/ dış JAR'leri yükleyin.

def getSession(): SparkSession = {
  if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
  } else {
    val builder = DatabricksSession.builder()
      .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)

     // Add all JARs from lib/ folder
     val libFolder = new java.io.File("lib")
     builder.addCompiledArtifacts(libFolder.toURI)


   builder.getOrCreate()
  }
}

Bu, yerel olarak çalışırken lib/ dizininizdeki tüm JAR'leri Databricks'e otomatik olarak yükler.

Birden çok modüle sahip projeler

Çok modüllü bir SBT projesinde yalnızca getClass.getProtectionDomain.getCodeSource.getLocation.toURI geçerli modülün konumunu döndürür. UDF'niz diğer modüllerdeki sınıfları kullanıyorsa alırsınız ClassNotFoundException.

my-project/
├── module-a/  (main application)
├── module-b/  (utilities - module-a depends on this)

Tüm konumlarını almak ve ayrı olarak yüklemek için her modüldeki bir sınıftan kullanın getClass :

// In module-a/src/main/scala/Main.scala
import com.company.moduleb.DataProcessor  // From module-b

def getSession(): SparkSession = {
  if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
  } else {
    // Get location using a class FROM module-a
    val moduleALocation = Main.getClass
      .getProtectionDomain.getCodeSource.getLocation.toURI

    // Get location using a class FROM module-b
    val moduleBLocation = DataProcessor.getClass
      .getProtectionDomain.getCodeSource.getLocation.toURI

    DatabricksSession.builder()
      .addCompiledArtifacts(moduleALocation)  // Upload module-a
      .addCompiledArtifacts(moduleBLocation)  // Upload module-b
      .getOrCreate()
  }
}

Sınırlamalar

  • Sunucusuz işlemde UDF desteği her zaman Databricks Connect'in ilk karşılık gelen ikincil sürümünü izler. Desteklenen sürümler için sürüm uyumluluk tablosuna bakın.