Felhasználó által definiált függvények a Databricks Csatlakozás Scalához
Feljegyzés
Ez a cikk a Databricks Runtime 14.1-hez és újabb verziókhoz készült Databricks Csatlakozás ismerteti.
Ez a cikk bemutatja, hogyan hajthat végre felhasználó által definiált függvényeket a Databricks Csatlakozás a Scalához. A Databricks Csatlakozás lehetővé teszi népszerű azonosítók, notebook-kiszolgálók és egyéni alkalmazások Azure Databricks-fürtökhöz való csatlakoztatását. A cikk Python-verziójának megtekintéséhez tekintse meg a Felhasználó által definiált függvényeket a Databricks Csatlakozás For Pythonban.
Feljegyzés
A Databricks Csatlakozás használatának megkezdése előtt be kell állítania a Databricks Csatlakozás-ügyfelet.
A Databricks Runtime 14.1 és újabb verziók esetében a Scalához készült Databricks Csatlakozás támogatja a felhasználó által definiált függvények (UDF-ek) futtatását.
Az UDF futtatásához a lefordított osztályt és az UDF által igényelt JAR-eket fel kell tölteni a fürtbe.
Az addCompiledArtifacts()
API-val megadhatja a feltöltendő lefordított osztály- és JAR-fájlokat.
Feljegyzés
Az ügyfél által használt Scalának meg kell egyeznie az Azure Databricks-fürt Scala-verziójával. A fürt Scala-verziójának ellenőrzéséhez tekintse meg a fürt Databricks Runtime-verziójának "System Environment" szakaszát a Databricks Runtime kiadási verzióiban és kompatibilitásában.
Az alábbi Scala program egy egyszerű UDF-t állít be, amely egy oszlop értékeit négyzetre állítja.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
def squared(x: Int): Int = x * x
val squared_udf = udf(squared _)
spark.range(3)
.withColumn("squared", squared_udf(col("id")))
.select("squared")
.show()
}
}
Az előző példában, mivel az UDF teljes mértékben benne Main
van, csak a lefordított összetevő Main
lesz hozzáadva.
Ha az UDF más osztályokra terjed ki, vagy külső kódtárakat (pl. JAR-eket) használ, ezeket a kódtárakat is tartalmaznia kell.
Ha a Spark-munkamenet már inicializálva van, további lefordított osztályok és JAR-ek tölthetők fel az spark.addArtifact()
API használatával.
Feljegyzés
A JAR-ek feltöltésekor a feltöltéshez minden tranzitív függőségi JAR-t fel kell venni. Az API-k nem végzik el az átmenő függőségek automatikus észlelését.
Gépelt adathalmaz API-k
Az UDF-ek előző szakaszában ismertetett mechanizmus a gépelt Adathalmaz API-kra is vonatkozik.
A gépelt adathalmaz API-k lehetővé teszik az átalakítások( például leképezés, szűrés és összesítések) futtatását az eredményül kapott adathalmazokon. Ezek végrehajtása a Databricks-fürtön lévő UDF-ekhez hasonlóan történik.
Az alábbi Scala-alkalmazás az map()
API használatával módosítja az eredményoszlopban lévő számokat egy előtaggal rendelkező sztringre.
import com.databricks.connect.DatabricksSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, udf}
object Main {
def main(args: Array[String]): Unit = {
val sourceLocation = getClass.getProtectionDomain.getCodeSource.getLocation.toURI
val spark = DatabricksSession.builder()
.addCompiledArtifacts(sourceLocation)
.getOrCreate()
spark.range(3).map(f => s"row-$f").show()
}
}
Bár ez a példa az map()
API-t használja, ez más típusú adathalmaz API-kra is vonatkozik, például filter()
, mapPartitions()
stb.