Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Not
Bu makale Databricks Runtime 14.1 ve üzeri için Databricks Connect'i kapsar.
Scala için Databricks Connect , yerel geliştirme ortamınızdan Databricks kümelerinde kullanıcı tanımlı işlevlerin (UDF) çalıştırılmasını destekler.
Bu sayfada Scala için Databricks Connect ile kullanıcı tanımlı işlevlerin nasıl yürütüldiği açıklanır.
Bu makalenin Python sürümü için bkz. Pythoniçin Databricks Connect'te kullanıcı tanımlı işlevler
Derlenmiş sınıfları ve JAR dosyalarını yükle
UDF'lerin çalışması için derlenen sınıfların ve JAR'lerin API kullanılarak addCompiledArtifacts() kümeye yüklenmesi gerekir.
Not
İstemci tarafından kullanılan Scala, Azure Databricks kümesindeki Scala sürümüyle eşleşmelidir. Kümenin Scala sürümünü kontrol etmek için, "Databricks Runtime sürüm notları ve uyumluluk" kısmında, kümenin Databricks Runtime sürümünün "Sistem Ortamı" bölümüne bakın.
Aşağıdaki Scala programı, bir sütundaki değerlerin karesini alan basit bir UDF kurar.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val spark = getSession()
val squared = udf((x: Long) => x * x)
spark.range(3)
.withColumn("squared", squared(col("id")))
.select("squared")
.show()
}
}
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
// On a Databricks cluster — reuse the active session
SparkSession.active
} else {
// Locally with Databricks Connect — upload local JARs and classes
DatabricksSession
.builder()
.addCompiledArtifacts(
Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
)
.getOrCreate()
}
}
}
Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI projenin derlenmiş çıkışıyla aynı konumu (örneğin, hedef/sınıflar veya yerleşik JAR) gösterir. Tüm derlenmiş sınıflar yalnızca Maindeğil Databricks'e yüklenir.
target/scala-2.13/classes/
├── com/
│ ├── examples/
│ │ ├── Main.class
│ │ └── MyUdfs.class
│ └── utils/
│ └── Helper.class
Spark oturumu zaten başlatılmış olduğunda, ek derlenmiş sınıflar ve JAR dosyaları spark.addArtifact() API kullanılarak yüklenebilir.
Not
JAR dosyalarını yüklerken, tüm geçişli bağımlılık JAR dosyaları yüklemeye dahil edilmelidir. API'ler geçişli bağımlılıkları otomatik olarak algılamaz.
Üçüncü taraf bağımlılıkları olan Kullanıcı Tanımlı Fonksiyonlar (UDF'ler)
UDF'de build.sbt kullanılan ancak Databricks kümesinde bulunmayan bir Maven bağımlılığı eklediyseniz, örneğin:
// In build.sbt
libraryDependencies += "org.apache.commons" % "commons-text" % "1.10.0"
// In your code
import org.apache.commons.text.StringEscapeUtils
// ClassNotFoundException thrown during UDF execution of this function on the server side
val escapeUdf = udf((text: String) => {
StringEscapeUtils.escapeHtml4(text)
})
spark.addArtifact() ile ivy:// kullanarak Maven'dan bağımlılıkları indirin.
Dosyanıza
orokitaplığınıbuild.sbtekleyinlibraryDependencies ++= Seq( "org.apache.commons" % "commons-text" % "1.10.0" % Provided, "oro" % "oro" % "2.0.8" // Required for ivy:// to work )API ile
addArtifact()oturum oluşturduktan sonra yapıtı ekleyin:def getSession(): SparkSession = { if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) { SparkSession.active } else { val spark = DatabricksSession.builder() .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI) .getOrCreate() // Convert Maven coordinates to ivy:// format // From: "org.apache.commons" % "commons-text" % "1.10.0" // To: ivy://org.apache.commons:commons-text:1.10.0 spark.addArtifact("ivy://org.apache.commons:commons-text:1.10.0") spark } }
Yazılan Veri Kümesi API'leri
Türlü Veri Kümesi API'leri, sonuçta elde edilen veri kümelerinde map(), filter(), mapPartitions() gibi dönüştürmeleri ve kümelemeleri gerçekleştirmeye olanak sağlar. DERlenmiş sınıfı ve JAR'leri API kullanarak addCompiledArtifacts() kümeye yüklemek de bunlar için geçerlidir, bu nedenle kodunuzun çalıştığı yere bağlı olarak farklı davranması gerekir:
- Databricks Connect ile yerel geliştirme: Yapıtları uzak kümeye yükleyin.
- Kümede çalışan Databricks'te dağıtıldı: Sınıflar zaten orada olduğundan herhangi bir şeyi karşıya yüklemeniz gerekmez.
Aşağıdaki Scala uygulaması, sonuç sütunundaki bir sayıyı ön ekli dize olarak değiştirmek için map() API'sini kullanır.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
Dış JAR bağımlılıkları
Kümede olmayan bir özel veya üçüncü taraf kitaplığı kullanıyorsanız:
import com.mycompany.privatelib.DataProcessor
// ClassNotFoundException thrown during UDF execution of this function on the server side
val myUdf = udf((data: String) => {
DataProcessor.process(data)
})
Oturumu oluştururken klasörünüzden lib/ dış JAR'leri yükleyin.
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
SparkSession.active
} else {
val builder = DatabricksSession.builder()
.addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
// Add all JARs from lib/ folder
val libFolder = new java.io.File("lib")
builder.addCompiledArtifacts(libFolder.toURI)
builder.getOrCreate()
}
}
Bu, yerel olarak çalışırken lib/ dizininizdeki tüm JAR'leri Databricks'e otomatik olarak yükler.
Birden çok modüle sahip projeler
Çok modüllü bir SBT projesinde yalnızca getClass.getProtectionDomain.getCodeSource.getLocation.toURI geçerli modülün konumunu döndürür. UDF'niz diğer modüllerdeki sınıfları kullanıyorsa alırsınız ClassNotFoundException.
my-project/
├── module-a/ (main application)
├── module-b/ (utilities - module-a depends on this)
Tüm konumlarını almak ve ayrı olarak yüklemek için her modüldeki bir sınıftan kullanın getClass :
// In module-a/src/main/scala/Main.scala
import com.company.moduleb.DataProcessor // From module-b
def getSession(): SparkSession = {
if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
SparkSession.active
} else {
// Get location using a class FROM module-a
val moduleALocation = Main.getClass
.getProtectionDomain.getCodeSource.getLocation.toURI
// Get location using a class FROM module-b
val moduleBLocation = DataProcessor.getClass
.getProtectionDomain.getCodeSource.getLocation.toURI
DatabricksSession.builder()
.addCompiledArtifacts(moduleALocation) // Upload module-a
.addCompiledArtifacts(moduleBLocation) // Upload module-b
.getOrCreate()
}
}
Sınırlamalar
- Sunucusuz işlemde UDF desteği her zaman Databricks Connect'in ilk karşılık gelen ikincil sürümünü izler. Desteklenen sürümler için sürüm uyumluluk tablosuna bakın.