Apa itu fungsi tabel yang ditentukan pengguna Python?
Penting
Fitur ini ada di Pratinjau Publik.
Fungsi tabel yang ditentukan pengguna (UDTF) memungkinkan Anda mendaftarkan fungsi yang mengembalikan tabel alih-alih nilai skalar. UDTF berfungsi mirip dengan ekspresi tabel umum (CTE) saat direferensikan dalam kueri SQL. Anda mereferensikan UDTF dalam FROM
klausul pernyataan SQL, dan Anda dapat menautkan operator Spark SQL tambahan ke hasilnya.
UDTF terdaftar ke SparkSession lokal dan diisolasi di notebook atau tingkat pekerjaan.
UDTF didukung pada komputasi yang dikonfigurasi dengan mode akses bersama yang ditetapkan atau tanpa isolasi. Anda tidak dapat menggunakan UDTF pada mode akses bersama.
Anda tidak dapat mendaftarkan UDTF sebagai objek di Unity Catalog, dan UDTF tidak dapat digunakan dengan gudang SQL.
Apa sintaks dasar untuk UDTF?
Apache Spark mengimplementasikan UDTF Python sebagai kelas Python dengan metode wajib eval
.
Anda memancarkan hasil sebagai baris menggunakan yield
.
Agar Apache Spark menggunakan kelas Anda sebagai UDTF, Anda harus mengimpor fungsi PySpark udtf
.
Databricks merekomendasikan penggunaan fungsi ini sebagai dekorator dan selalu secara eksplisit menentukan nama dan jenis bidang menggunakan returnType
opsi .
Contoh berikut membuat tabel sederhana dari input skalar menggunakan UDTF:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
def eval(self, x: int, y: int):
yield x + y, x - y
SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# | 3| -1|
# +----+-----+
Anda dapat menggunakan sintaks Python *args
dan menerapkan logika untuk menangani jumlah nilai input yang tidak ditentukan. Contoh berikut mengembalikan hasil yang sama sambil secara eksplisit memeriksa panjang input dan jenis untuk argumen:
@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# | 3| -1|
# +----+-----+
Mendaftarkan UDTF
Anda dapat mendaftarkan UDTF ke SparkSession saat ini untuk digunakan dalam kueri SQL menggunakan sintaks berikut:
spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)
Contoh berikut mendaftarkan UDTF Python ke SQL:
spark.udtf.register("simple_udtf", SimpleUDTF)
Setelah terdaftar, Anda dapat menggunakan UDTF di SQL menggunakan %sql
perintah atau spark.sql()
fungsi ajaib, seperti dalam contoh berikut:
%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")
Menghasilkan hasil
UDTF Python diimplementasikan dengan yield
untuk mengembalikan hasil. Hasil selalu dikembalikan sebagai tabel yang berisi 0 baris atau lebih dengan skema yang ditentukan.
Saat meneruskan argumen skalar, logika dalam eval
metode berjalan persis sekali dengan kumpulan argumen skalar yang diteruskan. Untuk argumen tabel, eval
metode berjalan sekali untuk setiap baris dalam tabel input.
Logika dapat ditulis untuk mengembalikan 0, 1, atau banyak baris per input.
UDTF berikut menunjukkan mengembalikan 0 baris atau lebih untuk setiap input dengan memisahkan item dari daftar yang dipisahkan koma menjadi entri terpisah:
from pyspark.sql.functions import udtf
@udtf(returnType="id: int, item: string")
class Itemize:
def eval(self, id: int, item_list: str):
items = item_list.split(",")
for item in items:
if item != "":
yield id, item
Meneruskan argumen tabel ke UDTF
Anda dapat menggunakan kata kunci TABLE()
SQL untuk meneruskan argumen tabel ke UDTF. Anda bisa menggunakan nama tabel atau kueri, seperti dalam contoh berikut:
TABLE(table_name);
TABLE(SELECT * FROM table_name);
Argumen tabel diproses satu baris dalam satu waktu. Anda dapat menggunakan anotasi bidang kolom PySpark standar untuk berinteraksi dengan kolom di setiap baris. Contoh berikut menunjukkan secara eksplisit mengimpor jenis PySpark Row
lalu memfilter tabel yang diteruskan pada id
bidang :
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# | 6|
# | 7|
# | 8|
# | 9|
# +---+
Meneruskan argumen skalar ke UDTF
Anda dapat meneruskan argumen skalar ke UDTF menggunakan kombinasi nilai berikut:
- Konstanta skalar
- Fungsi bernilai skalar
- Bidang dalam relasi
Untuk meneruskan bidang dalam relasi, Anda harus mendaftarkan UDTF dan menggunakan kata kunci SQL LATERAL
.
Catatan
Anda dapat menggunakan alias tabel sebaris untuk memisahkan kolom.
Contoh berikut menunjukkan penggunaan LATERAL
untuk meneruskan bidang dari tabel ke UDTF:
from pyspark.sql.functions import udtf
@udtf(returnType="id: int, item: string")
class Itemize:
def eval(self, id: int, item_list: str):
items = item_list.split(",")
for item in items:
if item != "":
yield id, item
spark.udtf.register("itemize", Itemize)
spark.sql("""
SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
(2, 'spoons,'),
(3, ''),
(4, 'knives,cups') t(id, item_list),
LATERAL itemize(id, item_list) b
""").show()
Mengatur nilai default untuk UDTF
Anda dapat secara opsional menerapkan __init__
metode untuk mengatur nilai default untuk variabel kelas yang dapat Anda referensikan dalam logika Python Anda.
Metode __init__
ini tidak menerima argumen apa pun dan tidak memiliki akses ke variabel atau informasi status dalam SparkSession.
Menggunakan Apache Arrow dengan UDTF
Databricks merekomendasikan penggunaan Apache Arrow untuk UDTF yang menerima sejumlah kecil data sebagai input tetapi menghasilkan tabel besar.
Anda dapat mengaktifkan Arrow dengan menentukan useArrow
parameter saat mendeklarasikan UDTF, seperti dalam contoh berikut:
from pyspark.sql.functions import udtf
@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
def eval(self, x: int):
yield x, x + 1