Fungsi skalar yang ditentukan pengguna - Python
Artikel ini berisi contoh fungsi Python yang ditentukan pengguna (UDF). Ini menunjukkan cara mendaftarkan UDF, cara memanggil UDF, dan menyediakan peringatan tentang urutan evaluasi subekspresi di Spark SQL.
Dalam Databricks Runtime 14.0 ke atas, Anda dapat menggunakan fungsi tabel yang ditentukan pengguna (UDTF) Python untuk mendaftarkan fungsi yang mengembalikan seluruh hubungan alih-alih nilai skalar. Lihat Fungsi tabel yang ditentukan pengguna (UDTF) Python.
Catatan
Dalam Databricks Runtime 12.2 LTS dan di bawahnya, UDF Python dan Pandas UDF tidak didukung pada komputasi Unity Catalog yang menggunakan mode akses bersama. UDF Scalar Python dan Pandas UDF didukung dalam Databricks Runtime 13.3 LTS ke atas untuk semua mode akses.
Dalam Databricks Runtime 13.3 LTS ke atas, Anda dapat mendaftarkan UDF Python skalar ke Unity Catalog menggunakan sintaks SQL. Lihat Fungsi yang ditentukan pengguna (UDF) di Unity Catalog.
Mendaftarkan fungsi sebagai UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Anda dapat mengatur jenis tampilan UDF Anda secara opsional. Jenis tampilan defaultnya adalah StringType
.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Memanggil UDF di Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Menggunakan UDF dengan DataFrames
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Atau, Anda dapat mendeklarasikan UDF yang sama dengan menggunakan sintaks anotasi:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Urutan evaluasi dan pemeriksaan null
Spark SQL (termasuk SQL dan DataFrame dan API Dataset) tidak menjamin urutan evaluasi subekspresi. Khususnya, input dari operator atau fungsi tidak selalu dievaluasi dari kiri ke kanan atau dalam urutan tetap lainnya. Misalnya, ekspresi AND
dan OR
logis tidak memiliki semantik "korsleting" kiri-ke-kanan.
Oleh karena itu, akan berbahaya jika mengandalkan efek samping atau urutan evaluasi ekspresi Boolean, dan urutan klausul WHERE
dan HAVING
, karena ekspresi dan klausul tersebut dapat disusun ulang selama pengoptimalan dan perencanaan kueri. Secara khusus, jika UDF bergantung pada semantik korsleting di SQL untuk pemeriksaan null, tidak ada jaminan bahwa pemeriksaan null akan terjadi sebelum memanggil UDF. Contohnya,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Klausul WHERE
ini tidak menjamin strlen
UDF untuk dipanggil setelah memfilter null.
Untuk melakukan pemeriksaan null yang tepat, kami sarankan agar Anda melakukan salah satu hal berikut:
- Buat agar UDF itu sendiri agar tahu akan adanya null dan lakukan pemeriksaan null di dalam UDF itu sendiri
- Gunakan ekspresi
IF
atauCASE WHEN
untuk melakukan pemeriksaan null dan memanggil UDF di cabang kondisional
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Batasan
- UDF PySpark pada kluster bersama atau komputasi tanpa server tidak dapat mengakses folder Git, file ruang kerja, atau Volume UC untuk mengimpor modul pada Databricks Runtime 14.2 ke bawah.