Condividi tramite


Funzioni definite dall'utente in Databricks Connect per Scala

Nota

Questo articolo illustra Databricks Connect per Databricks Runtime 14.1 e versioni successive.

Databricks Connect per Scala supporta l'esecuzione di funzioni definite dall'utente nei cluster Databricks dall'ambiente di sviluppo locale.

Questa pagina descrive come eseguire funzioni definite dall'utente con Databricks Connect per Scala.

Per la versione Python di questo articolo, vedere Funzioni definite dall'utente in Databricks Connect per Python.

Caricare le classi compilate e i file JAR

Affinché le funzioni definite dall'utente funzionino, è necessario caricare nel cluster le classi compilate e i file JAR tramite l'API addCompiledArtifacts().

Nota

La versione di Scala utilizzata dal client deve corrispondere alla versione di Scala nel cluster Azure Databricks. Per controllare la versione di Scala del cluster, vedere la sezione "Ambiente di sistema" per la versione di Databricks Runtime del cluster in Versioni e compatibilità delle note di rilascio di Databricks Runtime.

Il programma Scala seguente configura una semplice UDF che quadra i valori in una colonna.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val spark = getSession()

    val squared = udf((x: Long) => x * x)

    spark.range(3)
      .withColumn("squared", squared(col("id")))
      .select("squared")
      .show()

    }
  }

  def getSession(): SparkSession = {
    if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
      // On a Databricks cluster — reuse the active session
      SparkSession.active
    } else {
      // Locally with Databricks Connect — upload local JARs and classes
      DatabricksSession
        .builder()
        .addCompiledArtifacts(
          Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
        )
        .getOrCreate()
    }
  }
}

Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI punta alla stessa posizione dell'output compilato del progetto (ad esempio, target/classes o JAR compilato). Tutte le classi compilate vengono caricate in Databricks, non solo Main.

target/scala-2.13/classes/
├── com/
│   ├── examples/
│   │   ├── Main.class
│   │   └── MyUdfs.class
│   └── utils/
│       └── Helper.class

Quando la sessione Spark è già inizializzata, è possibile caricare altre classi compilate e jar usando l'API spark.addArtifact().

Nota

Quando si caricano file JAR, per il caricamento devono essere inclusi tutti i JAR di dipendenza transitiva. Le API non eseguono alcun rilevamento automatico delle dipendenze transitive.

Funzioni definite dall'utente con dipendenze di terze parti

Se è stata aggiunta una dipendenza Maven in build.sbt che viene usata in una funzione definita dall'utente ma non è disponibile nel cluster Databricks, ad esempio:

// In build.sbt
libraryDependencies += "org.apache.commons" % "commons-text" % "1.10.0"
// In your code
import org.apache.commons.text.StringEscapeUtils

// ClassNotFoundException thrown during UDF execution of this function on the server side
val escapeUdf = udf((text: String) => {
  StringEscapeUtils.escapeHtml4(text)
})

Usare spark.addArtifact() con ivy:// per scaricare le dipendenze da Maven:

  1. Aggiungere la oro libreria al build.sbt file

    libraryDependencies ++= Seq(
      "org.apache.commons" % "commons-text" % "1.10.0" % Provided,
      "oro" % "oro" % "2.0.8"  // Required for ivy:// to work
    )
    
  2. Aggiungere l'artefatto dopo aver creato la sessione con l'API addArtifact() :

    def getSession(): SparkSession = {
      if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
        SparkSession.active
      } else {
        val spark = DatabricksSession.builder()
          .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
          .getOrCreate()
    
        // Convert Maven coordinates to ivy:// format
        // From: "org.apache.commons" % "commons-text" % "1.10.0"
        // To:   ivy://org.apache.commons:commons-text:1.10.0
        spark.addArtifact("ivy://org.apache.commons:commons-text:1.10.0")
    
        spark
      }
    }
    

API del set di dati tipizzato

Le API del dataset tipizzato consentono di eseguire trasformazioni come map(), filter(), mapPartitions() e aggregazioni su set di dati risultanti. Anche il caricamento della classe compilata e dei file JAR nel cluster usando l'API addCompiledArtifacts() si applica a questi elementi, pertanto il codice deve comportarsi in modo diverso a seconda della posizione in cui viene eseguita:

  • Sviluppo locale con Databricks Connect: caricare artefatti nel cluster remoto.
  • Distribuito in Databricks in esecuzione nel cluster: non è necessario caricare nulla perché le classi sono già presenti.

L'applicazione Scala seguente usa l'API map() per modificare un numero in una colonna di risultato in una stringa con prefisso.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Dipendenze JAR esterne

Se si usa una libreria privata o di terze parti che non si trova nel cluster:

import com.mycompany.privatelib.DataProcessor

// ClassNotFoundException thrown during UDF execution of this function on the server side
val myUdf = udf((data: String) => {
  DataProcessor.process(data)
})

Caricare file JAR esterni dalla lib/ cartella durante la creazione della sessione:

def getSession(): SparkSession = {
  if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
  } else {
    val builder = DatabricksSession.builder()
      .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)

     // Add all JARs from lib/ folder
     val libFolder = new java.io.File("lib")
     builder.addCompiledArtifacts(libFolder.toURI)


   builder.getOrCreate()
  }
}

In questo modo tutti i file JAR vengono caricati automaticamente nella directory lib/in Databricks durante l'esecuzione in locale.

Progetti con più moduli

In un progetto SBT multimodulo, getClass.getProtectionDomain.getCodeSource.getLocation.toURI restituisce solo la posizione del modulo corrente. Se la UDF usa classi di altri moduli, otterrai ClassNotFoundException.

my-project/
├── module-a/  (main application)
├── module-b/  (utilities - module-a depends on this)

Usare getClass da una classe in ciascun modulo per ottenere tutte le posizioni e caricarle separatamente.

// In module-a/src/main/scala/Main.scala
import com.company.moduleb.DataProcessor  // From module-b

def getSession(): SparkSession = {
  if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
  } else {
    // Get location using a class FROM module-a
    val moduleALocation = Main.getClass
      .getProtectionDomain.getCodeSource.getLocation.toURI

    // Get location using a class FROM module-b
    val moduleBLocation = DataProcessor.getClass
      .getProtectionDomain.getCodeSource.getLocation.toURI

    DatabricksSession.builder()
      .addCompiledArtifacts(moduleALocation)  // Upload module-a
      .addCompiledArtifacts(moduleBLocation)  // Upload module-b
      .getOrCreate()
  }
}