Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Fungsi yang ditentukan pengguna (UDF) memungkinkan Anda menggunakan kembali dan berbagi kode yang memperluas fungsionalitas bawaan di Azure Databricks. Gunakan UDF untuk melakukan tugas tertentu seperti perhitungan kompleks, transformasi, atau manipulasi data kustom.
Kapan menggunakan fungsi UDF vs. Apache Spark?
Gunakan UDF untuk logika yang sulit diekspresikan dengan fungsi Apache Spark bawaan. Fungsi Apache Spark bawaan dioptimalkan untuk pemrosesan terdistribusi dan menawarkan performa yang lebih baik dalam skala besar. Untuk informasi selengkapnya, lihat Fungsi.
Databricks merekomendasikan UDF untuk kueri ad hoc, pembersihan data manual, analisis data eksploratif, dan operasi pada himpunan data kecil hingga menengah. Kasus penggunaan umum untuk UDF termasuk enkripsi data, dekripsi, hashing, penguraian JSON, dan validasi.
Gunakan metode Apache Spark untuk operasi pada himpunan data yang sangat besar dan beban kerja apa pun yang berjalan secara teratur atau terus menerus, termasuk pekerjaan ETL dan operasi streaming.
Memahami jenis UDF
Pilih jenis UDF dari tab berikut untuk melihat deskripsi, contoh, dan tautan untuk mempelajari selengkapnya.
UDF Skalar
UDF skalar beroperasi pada satu baris dan mengembalikan satu nilai hasil untuk setiap baris. Katalog Unity dapat diatur atau dilingkup sesi.
Contoh berikut menggunakan UDF skalar untuk menghitung panjang setiap nama dalam name kolom dan menambahkan nilai di kolom name_lengthbaru .
+-------+-------+
| name | score |
+-------+-------+
| alice | 10.0 |
| bob | 20.0 |
| carol | 30.0 |
| dave | 40.0 |
| eve | 50.0 |
+-------+-------+
-- Create a SQL UDF for name length
CREATE OR REPLACE FUNCTION main.test.get_name_length(name STRING)
RETURNS INT
RETURN LENGTH(name);
-- Use the UDF in a SQL query
SELECT name, main.test.get_name_length(name) AS name_length
FROM your_table;
+-------+-------+-------------+
| name | score | name_length |
+-------+-------+-------------+
| alice | 10.0 | 5 |
| bob | 20.0 | 3 |
| carol | 30.0 | 5 |
| dave | 40.0 | 4 |
| eve | 50.0 | 3 |
+-------+-------+-------------+
Untuk menerapkan ini dalam notebook Databricks menggunakan PySpark:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
@udf(returnType=IntegerType())
def get_name_length(name):
return len(name)
df = df.withColumn("name_length", get_name_length(df.name))
# Show the result
display(df)
Lihat Fungsi yang ditentukan pengguna (UDF) di Unity Catalog dan Fungsi skalar yang ditentukan pengguna - Python.
Batch Fungsi Terdefinisi Pengguna Skalar
Memproses data dalam batch sambil mempertahankan paritas baris input/output 1:1. Ini mengurangi overhead operasi baris demi baris untuk pemrosesan data skala besar. UDF batch juga mempertahankan status antar batch agar berjalan lebih efisien, menggunakan kembali sumber daya, dan menangani perhitungan kompleks yang membutuhkan konteks di seluruh gugus data.
Katalog Unity dapat diatur atau dilingkup sesi.
Katalog Unity Batch menggunakan Python UDF berikut ini untuk menghitung BMI saat memproses baris-baris secara bertahap:
+-------------+-------------+
| weight_kg | height_m |
+-------------+-------------+
| 90 | 1.8 |
| 77 | 1.6 |
| 50 | 1.5 |
+-------------+-------------+
%sql
CREATE OR REPLACE FUNCTION main.test.calculate_bmi_pandas(weight_kg DOUBLE, height_m DOUBLE)
RETURNS DOUBLE
LANGUAGE PYTHON
PARAMETER STYLE PANDAS
HANDLER 'handler_function'
AS $$
import pandas as pd
from typing import Iterator, Tuple
def handler_function(batch_iter: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for weight_series, height_series in batch_iter:
yield weight_series / (height_series ** 2)
$$;
select main.test.calculate_bmi_pandas(cast(70 as double), cast(1.8 as double));
+--------+
| BMI |
+--------+
| 27.8 |
| 30.1 |
| 22.2 |
+--------+
Lihat Fungsi yang ditentukan pengguna (UDF) di Unity Catalog dan Fungsi yang Ditentukan Pengguna (UDF) Python Batch di Unity Catalog.
UDF non-skalar
UDF non-skalar beroperasi pada seluruh himpunan data/kolom dengan rasio input/output yang fleksibel (1:N atau banyak:banyak).
Batch pandas UDF yang memiliki jangkauan sesi dapat terdiri dari jenis-jenis berikut:
- Seri ke Seri
- Iterator Seri ke Iterator Seri
- Iterator dari beberapa Seri menjadi Iterator dari Seri
- Seri ke skalar
Berikut ini adalah contoh Seri ke Seri panda UDF.
from pyspark.sql.functions import pandas_udf
import pandas as pd
df = spark.createDataFrame([(70, 1.75), (80, 1.80), (60, 1.65)], ["Weight", "Height"])
@pandas_udf("double")
def calculate_bmi_pandas(weight: pd.Series, height: pd.Series) -> pd.Series:
return weight / (height ** 2)
df.withColumn("BMI", calculate_bmi_pandas(df["Weight"], df["Height"])).display()
Lihat fungsi panda yang ditentukan pengguna.
UDAF
UDAF beroperasi pada beberapa baris dan mengembalikan satu hasil agregat. UDAF hanya terbatas pada lingkup sesi.
Contoh UDAF berikut mengagregasi skor berdasarkan panjang nama.
from pyspark.sql.functions import pandas_udf
from pyspark.sql import SparkSession
import pandas as pd
# Define a pandas UDF for aggregating scores
@pandas_udf("int")
def total_score_udf(scores: pd.Series) -> int:
return scores.sum()
# Group by name length and aggregate
result_df = (df.groupBy("name_length")
.agg(total_score_udf(df["score"]).alias("total_score")))
display(result_df)
+-------------+-------------+
| name_length | total_score |
+-------------+-------------+
| 3 | 70.0 |
| 4 | 40.0 |
| 5 | 40.0 |
+-------------+-------------+
Lihat fungsi pandas yang ditentukan pengguna untuk Python dan fungsi agregat yang ditentukan pengguna - Scala.
UDTF
UDTF mengambil satu atau beberapa argumen input dan mengembalikan beberapa baris (dan mungkin beberapa kolom) untuk setiap baris input. Katalog Unity dapat diatur atau dilingkup sesi.
UDTF berikut membuat tabel menggunakan daftar tetap dari dua argumen bilangan bulat:
CREATE OR REPLACE FUNCTION get_sum_diff(x INT, y INT)
RETURNS TABLE (sum INT, diff INT)
LANGUAGE PYTHON
HANDLER 'GetSumDiff'
AS $$
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
$$;
SELECT * FROM get_sum_diff(10, 3);
+-----+------+
| sum | diff |
+-----+------+
| 13 | 7 |
+-----+------+
Untuk menerapkan ini dalam notebook Databricks menggunakan PySpark:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
def eval(self, x: int, y: int):
yield x + y, x - y
GetSumDiff(lit(1), lit(2)).show()
Lihat Unity Catalog UDTF dan UDTF yang terlingkup sesi.
Katalog Unity diatur vs. UDF terlingkup sesi
Unity Catalog Python UDFs, Batch Unity Catalog Python UDFs, dan Unity Catalog Python UDTFs dipertahankan di Unity Catalog untuk meningkatkan tata kelola, penggunaan kembali, dan penemuan. Semua UDF lainnya berbasis sesi, yang berarti didefinisikan dalam buku catatan atau pekerjaan dan dilingkup ke SparkSession saat ini. Anda dapat menentukan dan mengakses UDF yang tercakup sesi menggunakan Scala atau Python.
Katalog Unity mengatur dan mengawasi lembar contekan UDF
UDF yang diatur oleh Unity Catalog memungkinkan fungsi kustom ditentukan, digunakan, dibagikan dengan aman, dan dikelola di seluruh lingkungan komputasi. Lihat Fungsi yang didefinisikan oleh pengguna (UDF) di Unity Catalog.
| Jenis UDF | Komputasi yang didukung | Deskripsi |
|---|---|---|
| Unity Catalog Python UDF |
|
Tentukan UDF di Python dan daftarkan di Unity Catalog untuk tata kelola. UDF skalar beroperasi pada satu baris dan mengembalikan satu nilai hasil untuk setiap baris. |
| Batch Katalog Unity Python UDF |
|
Tentukan UDF di Python dan daftarkan di Unity Catalog untuk tata kelola. Operasi batch pada beberapa nilai dan mengembalikan beberapa nilai. Mengurangi overhead pada operasi baris demi baris dalam pemrosesan data skala besar. |
| Unity Catalog Python UDTF |
|
Tentukan UDTF di Python dan daftarkan di Unity Catalog untuk tata kelola. UDTF mengambil satu atau beberapa argumen input dan mengembalikan beberapa baris (dan mungkin beberapa kolom) untuk setiap baris input. |
Panduan singkat UDF berlingkup sesi untuk komputasi yang terisolasi untuk pengguna
UDF cakupan sesi ditentukan dalam notebook atau pekerjaan dan dilingkup ke SparkSession saat ini. Anda dapat menentukan dan mengakses UDF yang tercakup sesi menggunakan Scala atau Python.
| Jenis UDF | Komputasi yang didukung | Deskripsi |
|---|---|---|
| Skalar Python |
|
UDF skalar beroperasi pada satu baris dan mengembalikan satu nilai hasil untuk setiap baris. |
| Python non-skalar |
|
UDF non-skalar meliputi pandas_udf, mapInPandas, mapInArrow, applyInPandas. UDFs Pandas menggunakan Apache Arrow untuk mentransfer data dan Pandas untuk mengolah data. UDF Panda mendukung operasi vektorisasi yang dapat sangat meningkatkan performa atas UDF skalar baris demi baris. |
| UDTF Python |
|
UDTF mengambil satu atau beberapa argumen input dan mengembalikan beberapa baris (dan mungkin beberapa kolom) untuk setiap baris input. |
| UDF Skalar Scala |
|
UDF skalar beroperasi pada satu baris dan mengembalikan satu nilai hasil untuk setiap baris. |
| Scala UDAF |
|
UDAF beroperasi pada beberapa baris dan mengembalikan satu hasil agregat. |
Pertimbangan performa
Fungsi bawaan dan UDF SQL adalah opsi yang paling efisien.
UDF Scala umumnya lebih cepat daripada UDF Python.
- UDF Scala yang tidak terisolasi berjalan di Java Virtual Machine (JVM), sehingga mengurangi beban memindahkan data masuk dan keluar dari JVM.
- UDF Scala terisolasi harus memindahkan data masuk dan keluar dari JVM, tetapi masih bisa lebih cepat daripada UDF Python karena menangani memori dengan lebih efisien.
UDF Python dan UDF panda cenderung lebih lambat daripada UDF Scala karena mereka perlu menserialisasikan data dan memindahkannya dari JVM ke penerjemah Python.
- UDF Panda hingga 100x lebih cepat daripada UDF Python karena mereka menggunakan Apache Arrow untuk mengurangi biaya serialisasi.