Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Artikel ini berisi contoh fungsi Python yang ditentukan pengguna (UDF). Ini menunjukkan cara mendaftarkan UDF, cara memanggil UDF, dan menyediakan peringatan tentang urutan evaluasi subekspresi di Spark SQL.
Dalam Databricks Runtime 14.0 ke atas, Anda dapat menggunakan fungsi tabel yang ditentukan pengguna (UDTF) Python untuk mendaftarkan fungsi yang mengembalikan seluruh hubungan alih-alih nilai skalar. Lihat fungsi tabel yang ditentukan oleh pengguna (UDTF) dalam Python.
Catatan
Dalam Databricks Runtime 12.2 LTS dan di bawahnya, UDF Python dan Pandas UDF tidak didukung pada komputasi Unity Catalog yang menggunakan mode akses standar. UDF Python Scalar dan UDF Pandas didukung dalam Databricks Runtime 13.3 LTS ke atas untuk semua mode akses.
Dalam Databricks Runtime 13.3 LTS ke atas, Anda dapat mendaftarkan UDF Python skalar ke Unity Catalog menggunakan sintaks SQL. Lihat Fungsi yang didefinisikan pengguna (UDF) di Unity Catalog.
Mendaftarkan fungsi sebagai UDF
def squared(s):
return s * s
spark.udf.register("squaredWithPython", squared)
Anda dapat secara opsional mengatur jenis pengembalian UDF Anda. Jenis pengembalian default adalah StringType.
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Memanggil UDF di Spark SQL
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
Menggunakan UDF dengan DataFrames
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Atau, Anda dapat mendeklarasikan UDF yang sama dengan menggunakan sintaks anotasi:
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
Varian dengan UDF
Jenis PySpark untuk varian adalah VariantType dan nilainya berjenis VariantVal. Untuk informasi tentang varian, lihat Kueri data varian.
from pyspark.sql.types import VariantType
# Return Variant
@udf(returnType = VariantType())
def toVariant(jsonString):
return VariantVal.parseJson(jsonString)
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toVariant(col("json"))).display()
+---------------+
|toVariant(json)|
+---------------+
| {"a":1}|
+---------------+
# Return Struct<Variant>
@udf(returnType = StructType([StructField("v", VariantType(), True)]))
def toStructVariant(jsonString):
return {"v": VariantVal.parseJson(jsonString)}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toStructVariant(col("json"))).display()
+---------------------+
|toStructVariant(json)|
+---------------------+
| {"v":{"a":1}}|
+---------------------+
# Return Array<Variant>
@udf(returnType = ArrayType(VariantType()))
def toArrayVariant(jsonString):
return [VariantVal.parseJson(jsonString)]
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+--------------------+
|toArrayVariant(json)|
+--------------------+
| [{"a":1}]|
+--------------------+
# Return Map<String, Variant>
@udf(returnType = MapType(StringType(), VariantType(), True))
def toArrayVariant(jsonString):
return {"v1": VariantVal.parseJson(jsonString), "v2": VariantVal.parseJson("[" + jsonString + "]")}
spark.range(1).select(lit('{"a" : 1}').alias("json")).select(toArrayVariant(col("json"))).display()
+-----------------------------+
| toArrayVariant(json)|
+-----------------------------+
|{"v2":[{"a":1}],"v1":{"a":1}}|
+-----------------------------+
Urutan evaluasi dan pemeriksaan nol
Spark SQL (termasuk SQL dan DataFrame dan API Dataset) tidak menjamin urutan evaluasi subekspresi. Khususnya, input dari operator atau fungsi tidak selalu dievaluasi dari kiri ke kanan atau dalam urutan tetap lainnya. Misalnya, ekspresi AND dan OR logis tidak memiliki semantik "korsleting" kiri-ke-kanan.
Oleh karena itu, akan berbahaya jika mengandalkan efek samping atau urutan evaluasi ekspresi Boolean, dan urutan klausul WHERE dan HAVING, karena ekspresi dan klausul tersebut dapat disusun ulang selama pengoptimalan dan perencanaan kueri. Secara khusus, jika UDF mengandalkan logika pemutusan singkat dalam SQL untuk pemeriksaan null, tidak ada jaminan bahwa pemeriksaan null akan dilakukan sebelum UDF dipanggil. Contohnya,
spark.udf.register("strlen", lambda s: len(s), "int")
spark.sql("select s from test1 where s is not null and strlen(s) > 1") # no guarantee
Klausul WHERE ini tidak menjamin bahwa strlen UDF akan dipanggil setelah memfilter null.
Untuk melakukan pemeriksaan null yang tepat, kami sarankan agar Anda melakukan salah satu hal berikut:
- Buat agar UDF itu sendiri agar tahu akan adanya null dan lakukan pemeriksaan null di dalam UDF itu sendiri
- Gunakan ekspresi
IFatauCASE WHENuntuk melakukan pemeriksaan null dan memanggil UDF di cabang kondisional
spark.udf.register("strlen_nullsafe", lambda s: len(s) if not s is None else -1, "int")
spark.sql("select s from test1 where s is not null and strlen_nullsafe(s) > 1") // ok
spark.sql("select s from test1 where if(s is not null, strlen(s), null) > 1") // ok
Kredensial layanan di UDF Scalar Python
UDF Scalar Python dapat menggunakan kredensial layanan Unity Catalog untuk mengakses layanan cloud eksternal dengan aman. Ini berguna untuk mengintegrasikan operasi seperti tokenisasi berbasis cloud, enkripsi, atau manajemen rahasia langsung ke dalam transformasi data Anda.
Kredensial layanan untuk UDF Python skalar hanya didukung di gudang SQL dan komputasi umum.
Catatan
Kredensial layanan di Scalar Python UDF memerlukan Databricks Runtime 17.1 ke atas.
Untuk membuat kredensial layanan, lihat Membuat kredensial layanan.
Catatan
API khusus UDF untuk kredensial layanan:
Di UDF, gunakan databricks.service_credentials.getServiceCredentialsProvider() untuk mengakses kredensial layanan.
Ini berbeda dari fungsi dbutils.credentials.getServiceCredentialsProvider() yang digunakan dalam notebook, yang tidak tersedia dalam konteks eksekusi UDF.
Untuk mengakses kredensial layanan, gunakan databricks.service_credentials.getServiceCredentialsProvider() utilitas dalam logika UDF Anda untuk menginisialisasi SDK cloud dengan kredensial yang sesuai. Semua kode harus dienkapsulasi dalam isi UDF.
@udf
def use_service_credential():
from azure.mgmt.web import WebSiteManagementClient
# Assuming there is a service credential named 'testcred' set up in Unity Catalog
web_client = WebSiteManagementClient(subscription_id, credential = getServiceCredentialsProvider('testcred'))
# Use web_client to perform operations
Izin akses layanan
Pembuat UDF harus memiliki izin ACCESS pada kredensial layanan Katalog Unity.
UDF yang berjalan dalam cakupan No-PE, juga dikenal sebagai kluster khusus, memerlukan izin MANAGE pada kredensial layanan.
Kredensial default
Ketika digunakan dalam Scalar Python UDFs, Databricks secara otomatis menggunakan kredensial layanan default dari variabel lingkungan komputasi. Perilaku ini memungkinkan Anda untuk mereferensikan layanan eksternal dengan aman tanpa secara eksplisit mengelola alias kredensial dalam kode UDF Anda. Lihat Menentukan kredensial layanan default untuk sumber daya komputasi
Dukungan kredensial default hanya tersedia di kluster mode akses Standar dan Khusus. Ini tidak tersedia di DBSQL.
Anda harus menginstal paket azure-identity untuk menggunakan penyedia DefaultAzureCredential. Untuk menginstal paket, lihat Pustaka Python dengan cakupan buku catatan atau pustaka cakupan komputasi.
@udf
def use_service_credential():
from azure.identity import DefaultAzureCredential
from azure.mgmt.web import WebSiteManagementClient
# DefaultAzureCredential is automatically using the default service credential for the compute
web_client_default = WebSiteManagementClient(DefaultAzureCredential(), subscription_id)
# Use web_client to perform operations
Dapatkan konteks eksekusi tugas
Gunakan API TaskContext PySpark untuk mendapatkan informasi konteks seperti identitas pengguna, tag klaster, ID pekerjaan spark, dan lainnya. Lihat Mendapatkan konteks tugas dalam UDF.
Batasan
Batasan berikut berlaku untuk UDF PySpark:
Pembatasan akses file: Pada Databricks Runtime 14.2 ke bawah, UDF PySpark pada kluster bersama tidak dapat mengakses folder Git, file ruang kerja, atau Volume Katalog Unity.
Variabel siaran: UDF PySpark pada kluster mode akses standar dan komputasi tanpa server tidak mendukung variabel siaran.
Kredensial layanan: Kredensial layanan hanya tersedia di Batch Unity Catalog Python UDFs dan Scalar Python UDFs. Mereka tidak didukung dalam Unity Catalog Python UDF standar.
Kredensial layanan: Kredensial layanan hanya tersedia dalam komputasi tanpa server saat menggunakan lingkungan tanpa server versi 3 atau lebih tinggi. Lihat versi lingkungan Tanpa Server.
- Batas memori tanpa server: UDF PySpark pada komputasi tanpa server memiliki batas memori 1GB per UDF PySpark. Melebihi batas ini menghasilkan kesalahan jenis UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT_SERVERLESS.
- Batas memori pada mode akses standar: UDF PySpark pada mode akses standar memiliki batas memori berdasarkan memori yang tersedia dari jenis instans yang dipilih. Melebihi memori yang tersedia menghasilkan kesalahan jenis UDF_PYSPARK_USER_CODE_ERROR. MEMORY_LIMIT.