Dela via


Användardefinierade funktioner i Databricks Anslut för Scala

Kommentar

Den här artikeln beskriver Databricks Anslut för Databricks Runtime 14.1 och senare.

Den här artikeln beskriver hur du kör användardefinierade funktioner med Databricks Anslut för Scala. Med Databricks Anslut kan du ansluta populära ID:er, notebook-servrar och anpassade program till Azure Databricks-kluster. Python-versionen av den här artikeln finns i Användardefinierade funktioner i Databricks Anslut för Python.

Kommentar

Innan du börjar använda Databricks Anslut måste du konfigurera Databricks-Anslut-klienten.

För Databricks Runtime 14.1 och senare har Databricks Anslut för Scala stöd för att köra användardefinierade funktioner (UDF:er).

För att kunna köra en UDF måste den kompilerade klassen och JAR:erna som krävs för UDF laddas upp till klustret. API:et addCompiledArtifacts() kan användas för att ange den kompilerade klassen och JAR-filer som måste laddas upp.

Kommentar

Den Scala som används av klienten måste matcha Scala-versionen i Azure Databricks-klustret. Information om hur du kontrollerar klustrets Scala-version finns i avsnittet "Systemmiljö" för klustrets Databricks Runtime-version i Databricks Runtime versionsanteckningar och kompatibilitet.

Följande Scala-program konfigurerar en enkel UDF som kvadraterar värden i en kolumn.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

I föregående exempel, eftersom UDF är helt inneslutet i Main, läggs endast den kompilerade artefakten Main till. Om UDF sprids över andra klasser eller använder externa bibliotek (d.v.s. JAR:er) bör alla dessa bibliotek också inkluderas.

När Spark-sessionen redan har initierats kan ytterligare kompilerade klasser och JAR:er laddas upp med hjälp av API:et spark.addArtifact() .

Kommentar

När du laddar upp JAR:er måste alla transitiva beroende-JAR:er inkluderas för uppladdning. API:erna utför ingen automatisk identifiering av transitiva beroenden.

Api:er för typade datauppsättningar

Samma mekanism som beskrivs i föregående avsnitt för UDF:er gäller även för typerade API:er för datauppsättningar.

Med inskrivna API:er för datauppsättningar kan en köra transformeringar som mappning, filter och aggregeringar på resulterande datauppsättningar. Dessa körs också på samma sätt som UDF:er i Databricks-klustret.

Följande Scala-program använder API:et map() för att ändra ett tal i en resultatkolumn till en prefixsträng.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Även om det här exemplet använder API:et map() gäller detta även för andra typerade API:er för datauppsättningar, till exempel filter(), mapPartitions()osv.