Megosztás a következőn keresztül:


Felhasználó által definiált függvények a Databricks Csatlakozás Scalához

Feljegyzés

Ez a cikk a Databricks Runtime 14.1-hez és újabb verziókhoz készült Databricks Csatlakozás ismerteti.

Ez a cikk bemutatja, hogyan hajthat végre felhasználó által definiált függvényeket a Databricks Csatlakozás a Scalához. A Databricks Csatlakozás lehetővé teszi népszerű azonosítók, notebook-kiszolgálók és egyéni alkalmazások Azure Databricks-fürtökhöz való csatlakoztatását. A cikk Python-verziójának megtekintéséhez tekintse meg a Felhasználó által definiált függvényeket a Databricks Csatlakozás For Pythonban.

Feljegyzés

A Databricks Csatlakozás használatának megkezdése előtt be kell állítania a Databricks Csatlakozás-ügyfelet.

A Databricks Runtime 14.1 és újabb verziók esetében a Scalához készült Databricks Csatlakozás támogatja a felhasználó által definiált függvények (UDF-ek) futtatását.

Az UDF futtatásához a lefordított osztályt és az UDF által igényelt JAR-eket fel kell tölteni a fürtbe. Az addCompiledArtifacts() API-val megadhatja a feltöltendő lefordított osztály- és JAR-fájlokat.

Feljegyzés

Az ügyfél által használt Scalának meg kell egyeznie az Azure Databricks-fürt Scala-verziójával. A fürt Scala-verziójának ellenőrzéséhez tekintse meg a fürt Databricks Runtime-verziójának "System Environment" szakaszát a Databricks Runtime kiadási verzióiban és kompatibilitásában.

Az alábbi Scala program egy egyszerű UDF-t állít be, amely egy oszlop értékeit négyzetre állítja.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    def squared(x: Int): Int = x * x

    val squared_udf = udf(squared _)

    spark.range(3)
      .withColumn("squared", squared_udf(col("id")))
      .select("squared")
      .show()
  }
}

Az előző példában, mivel az UDF teljes mértékben benne Mainvan, csak a lefordított összetevő Main lesz hozzáadva. Ha az UDF más osztályokra terjed ki, vagy külső kódtárakat (pl. JAR-eket) használ, ezeket a kódtárakat is tartalmaznia kell.

Ha a Spark-munkamenet már inicializálva van, további lefordított osztályok és JAR-ek tölthetők fel az spark.addArtifact() API használatával.

Feljegyzés

A JAR-ek feltöltésekor a feltöltéshez minden tranzitív függőségi JAR-t fel kell venni. Az API-k nem végzik el az átmenő függőségek automatikus észlelését.

Gépelt adathalmaz API-k

Az UDF-ek előző szakaszában ismertetett mechanizmus a gépelt Adathalmaz API-kra is vonatkozik.

A gépelt adathalmaz API-k lehetővé teszik az átalakítások( például leképezés, szűrés és összesítések) futtatását az eredményül kapott adathalmazokon. Ezek végrehajtása a Databricks-fürtön lévő UDF-ekhez hasonlóan történik.

Az alábbi Scala-alkalmazás az map() API használatával módosítja az eredményoszlopban lévő számokat egy előtaggal rendelkező sztringre.

import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}

object Main {
  def main(args: Array[String]): Unit = {
    val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI

    val spark = DatabricksSession.builder()
      .addCompiledArtifacts(sourceLocation)
      .getOrCreate()

    spark.range(3).map(f => s"row-$f").show()
  }
}

Bár ez a példa az map() API-t használja, ez más típusú adathalmaz API-kra is vonatkozik, például filter(), mapPartitions()stb.