Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Artikel ini berisi contoh fungsi Scala yang ditentukan pengguna (UDF). Artikel ini menunjukkan cara mendaftarkan UDF, cara memanggil UDF, dan peringatan mengenai urutan evaluasi subekspresi di Spark SQL. Lihat Fungsi skalar yang ditentukan pengguna eksternal (UDF) untuk detail selengkapnya.
Persyaratan
UDF Scala pada sumber daya komputasi yang didukung Unity Catalog dengan mode akses standar memerlukan Databricks Runtime 14.2 atau lebih tinggi.
Dukungan instans ARM untuk Scala UDF pada kluster yang mendukung Unity Catalog memerlukan Databricks Runtime 15.2 atau lebih tinggi.
Mendaftarkan fungsi sebagai UDF
val squared = (s: Long) => {
s * s
}
spark.udf.register("square", squared)
Memanggil UDF di Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, square(id) as id_squared from test
Menggunakan UDF dengan DataFrames
import org.apache.spark.sql.functions.{col, udf}
val squared = udf((s: Long) => s * s)
display(spark.range(1, 20).select(squared(col("id")) as "id_squared"))
Urutan evaluasi dan pengecekan null
SQL Spark (termasuk API SQL dan DataFrame dan Dataset) tidak menjamin urutan evaluasi subekspresi. Khususnya, input dari operator atau fungsi tidak selalu dievaluasi dari kiri ke kanan atau dalam urutan tetap lainnya. Misalnya, ekspresi AND dan OR logis tidak memiliki semantik "korsleting" kiri-ke-kanan.
Oleh karena itu, akan berbahaya jika mengandalkan efek samping atau urutan evaluasi ekspresi Boolean, dan urutan klausul WHERE dan HAVING, karena ekspresi dan klausul tersebut dapat disusun ulang selama pengoptimalan dan perencanaan kueri. Secara khusus, jika UDF mengandalkan logika pemutusan singkat dalam SQL untuk pemeriksaan null, tidak ada jaminan bahwa pemeriksaan null akan dilakukan sebelum UDF dipanggil. Contohnya,
spark.udf.register("strlen", (s: String) => s.length)
spark.sql("select s from test1 where s is not null and strlen(s) > 1") // no guarantee
Klausul WHERE ini tidak menjamin strlen UDF akan dipanggil setelah menyaring null.
Untuk melakukan pemeriksaan null yang tepat, kami sarankan agar Anda melakukan salah satu hal berikut:
- Buat agar UDF itu sendiri agar tahu akan adanya null dan lakukan pemeriksaan null di dalam UDF itu sendiri
- Gunakan ekspresi
IFatauCASE WHENuntuk melakukan pemeriksaan null dan memanggil UDF di cabang kondisional
spark.udf.register("strlen_nullsafe", (s: String) => if (s != null) s.length else -1)
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
API Himpunan Data yang Ditik
Catatan
Fitur ini didukung pada kluster yang mendukung Unity Catalog dengan mode akses standar di Databricks Runtime 15.4 ke atas.
API Himpunan Data yang Ditik memungkinkan Anda menjalankan transformasi seperti peta, filter, dan agregasi pada Himpunan Data yang dihasilkan dengan fungsi yang ditentukan pengguna.
Misalnya, aplikasi Scala berikut menggunakan API map() untuk memodifikasi angka di kolom hasil menjadi string awalan.
spark.range(3).map(f => s"row-$f").show()
Meskipun contoh ini menggunakan map() API, ini juga berlaku untuk API Himpunan Data jenis lainnya, seperti filter(), , mapPartitions(), foreach()foreachPartition(), reduce(), dan flatMap().
Fitur-fitur UDF Scala dan kompatibilitas dengan Databricks Runtime
Fitur Scala berikut memerlukan versi Databricks Runtime minimum saat digunakan pada kluster yang diaktifkan Unity Catalog dalam mode akses standar (bersama).
| Fitur | Versi Minimum Databricks Runtime |
|---|---|
| UDF skalar | Databricks Runtime 14.2 |
Dataset.map, Dataset.mapPartitionsDataset.filter, Dataset.reduce,Dataset.flatMap |
Databricks Runtime 15.4 |
KeyValueGroupedDataset.flatMapGroups, KeyValueGroupedDataset.mapGroups |
Databricks Runtime 15.4 |
(Streaming) foreachWriter Sink |
Databricks Runtime 15.4 |
(Streaming) foreachBatch |
Databricks Runtime 16.1 |
(Streaming) KeyValueGroupedDataset.flatMapGroupsWithState |
Databricks Runtime 16.2 |