Comparteix via


Funciones definidas por el usuario en Databricks Connect para Scala

Nota

En este artículo se describe Databricks Connect para Databricks Runtime 14.1 y versiones posteriores.

Databricks Connect para Scala admite la ejecución de funciones definidas por el usuario (UDF) en clústeres de Databricks desde el entorno de desarrollo local.

En esta página se describe cómo ejecutar funciones definidas por el usuario con Databricks Connect para Scala.

Para obtener la versión de Python de este artículo, consulte funciones definidas por el usuario en Databricks Connect para Python.

Cargar clase compilada y JAR

Para que las UDF funcionen, las clases compiladas y los JAR deben cargarse en el clúster mediante la addCompiledArtifacts() API.

Nota

Scala usado por el cliente debe coincidir con la versión de Scala en el clúster de Azure Databricks. Para comprobar la versión de Scala del clúster, revisa la sección "Entorno del sistema" para la versión del Databricks Runtime del clúster en las notas de la versión y compatibilidad de Databricks Runtime.

El siguiente programa de Scala define una UDF sencilla que cuadra los valores de una columna.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val spark = getSession()

    val squared = udf((x: Long) => x * x)

    spark.range(3)
      .withColumn("squared", squared(col("id")))
      .select("squared")
      .show()

    }
  }

  def getSession(): SparkSession = {
    if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
      // On a Databricks cluster — reuse the active session
      SparkSession.active
    } else {
      // Locally with Databricks Connect — upload local JARs and classes
      DatabricksSession
        .builder()
        .addCompiledArtifacts(
          Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI
        )
        .getOrCreate()
    }
  }
}

Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI apunta a la misma ubicación que la salida compilada del proyecto (por ejemplo, el destino o las clases o el ARCHIVO JAR compilado). Todas las clases compiladas se cargan en Databricks, no solo Main.

target/scala-2.13/classes/
├── com/
│   ├── examples/
│   │   ├── Main.class
│   │   └── MyUdfs.class
│   └── utils/
│       └── Helper.class

Cuando la sesión de Spark ya está inicializada, se pueden cargar más clases compiladas y JAR mediante la API de spark.addArtifact().

Nota

Al cargar JARs, se deben incluir todos los JARs de dependencia transitiva al momento de la carga. Las API no realizan ninguna detección automática de dependencias transitivas.

Funciones definidas por el usuario (UDFs) con dependencias externas

Si ha agregado una dependencia de Maven en build.sbt que se usa en una UDF, pero no está disponible en el clúster de Databricks, por ejemplo:

// In build.sbt
libraryDependencies += "org.apache.commons" % "commons-text" % "1.10.0"
// In your code
import org.apache.commons.text.StringEscapeUtils

// ClassNotFoundException thrown during UDF execution of this function on the server side
val escapeUdf = udf((text: String) => {
  StringEscapeUtils.escapeHtml4(text)
})

Use spark.addArtifact() con ivy:// para descargar dependencias de Maven:

  1. Agregar la oro biblioteca al build.sbt archivo

    libraryDependencies ++= Seq(
      "org.apache.commons" % "commons-text" % "1.10.0" % Provided,
      "oro" % "oro" % "2.0.8"  // Required for ivy:// to work
    )
    
  2. Agregue el artefacto después de crear la sesión con la addArtifact() API:

    def getSession(): SparkSession = {
      if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
        SparkSession.active
      } else {
        val spark = DatabricksSession.builder()
          .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)
          .getOrCreate()
    
        // Convert Maven coordinates to ivy:// format
        // From: "org.apache.commons" % "commons-text" % "1.10.0"
        // To:   ivy://org.apache.commons:commons-text:1.10.0
        spark.addArtifact("ivy://org.apache.commons:commons-text:1.10.0")
    
        spark
      }
    }
    

APIs de Conjuntos de Datos Tipados

Las API de conjunto de datos con tipo permiten ejecutar transformaciones como map(), filter(), mapPartitions()y agregaciones en conjuntos de datos resultantes. La carga de la clase compilada y los JAR en el clúster mediante la addCompiledArtifacts() API también se aplica a estos, por lo que el código debe comportarse de forma diferente en función de dónde se ejecute:

  • Desarrollo local con Databricks Connect: cargue artefactos en el clúster remoto.
  • Implementado en Databricks que se ejecuta en el clúster: no es necesario cargar nada porque las clases ya están allí.

La siguiente aplicación de Scala usa la API de map() para modificar un número en una columna de resultado a una cadena con prefijo.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Dependencias de JAR externas

Si usa una biblioteca privada o de terceros que no está en el clúster:

import com.mycompany.privatelib.DataProcessor

// ClassNotFoundException thrown during UDF execution of this function on the server side
val myUdf = udf((data: String) => {
  DataProcessor.process(data)
})

Cargue jaR externos desde la lib/ carpeta al crear la sesión:

def getSession(): SparkSession = {
  if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
  } else {
    val builder = DatabricksSession.builder()
      .addCompiledArtifacts(Main.getClass.getProtectionDomain.getCodeSource.getLocation.toURI)

     // Add all JARs from lib/ folder
     val libFolder = new java.io.File("lib")
     builder.addCompiledArtifacts(libFolder.toURI)


   builder.getOrCreate()
  }
}

De este modo, se cargan automáticamente todos los ARCHIVOS JAR del directorio lib/en Databricks cuando se ejecutan localmente.

Proyectos con varios módulos

En un proyecto SBT de varios módulos, getClass.getProtectionDomain.getCodeSource.getLocation.toURI solo devuelve la ubicación del módulo actual. Si la UDF usa clases de otros módulos, obtendrá ClassNotFoundException.

my-project/
├── module-a/  (main application)
├── module-b/  (utilities - module-a depends on this)

Use getClass desde una clase de cada módulo para obtener toda su ubicación y cargarlas por separado:

// In module-a/src/main/scala/Main.scala
import com.company.moduleb.DataProcessor  // From module-b

def getSession(): SparkSession = {
  if (sys.env.contains("DATABRICKS_RUNTIME_VERSION")) {
    SparkSession.active
  } else {
    // Get location using a class FROM module-a
    val moduleALocation = Main.getClass
      .getProtectionDomain.getCodeSource.getLocation.toURI

    // Get location using a class FROM module-b
    val moduleBLocation = DataProcessor.getClass
      .getProtectionDomain.getCodeSource.getLocation.toURI

    DatabricksSession.builder()
      .addCompiledArtifacts(moduleALocation)  // Upload module-a
      .addCompiledArtifacts(moduleBLocation)  // Upload module-b
      .getOrCreate()
  }
}