Bagikan melalui


Apa itu fungsi tabel yang ditentukan pengguna Python?

Penting

Fitur ini ada di Pratinjau Publik.

Fungsi tabel yang ditentukan pengguna (UDTF) memungkinkan Anda mendaftarkan fungsi yang mengembalikan tabel alih-alih nilai skalar. UDTF berfungsi mirip dengan ekspresi tabel umum (CTE) saat direferensikan dalam kueri SQL. Anda mereferensikan UDTF dalam FROM klausul pernyataan SQL, dan Anda dapat menautkan operator Spark SQL tambahan ke hasilnya.

UDTF terdaftar ke SparkSession lokal dan diisolasi di notebook atau tingkat pekerjaan.

UDTF didukung pada komputasi yang dikonfigurasi dengan mode akses bersama yang ditetapkan atau tanpa isolasi. Anda tidak dapat menggunakan UDTF pada mode akses bersama.

Anda tidak dapat mendaftarkan UDTF sebagai objek di Unity Catalog, dan UDTF tidak dapat digunakan dengan gudang SQL.

Apa sintaks dasar untuk UDTF?

Apache Spark mengimplementasikan UDTF Python sebagai kelas Python dengan metode wajib eval .

Anda memancarkan hasil sebagai baris menggunakan yield.

Agar Apache Spark menggunakan kelas Anda sebagai UDTF, Anda harus mengimpor fungsi PySpark udtf .

Databricks merekomendasikan penggunaan fungsi ini sebagai dekorator dan selalu secara eksplisit menentukan nama dan jenis bidang menggunakan returnType opsi .

Contoh berikut membuat tabel sederhana dari input skalar menggunakan UDTF:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, x: int, y: int):
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Anda dapat menggunakan sintaks Python *args dan menerapkan logika untuk menangani jumlah nilai input yang tidak ditentukan. Contoh berikut mengembalikan hasil yang sama sambil secara eksplisit memeriksa panjang input dan jenis untuk argumen:

@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# |   3|   -1|
# +----+-----+

Mendaftarkan UDTF

Anda dapat mendaftarkan UDTF ke SparkSession saat ini untuk digunakan dalam kueri SQL menggunakan sintaks berikut:

spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)

Contoh berikut mendaftarkan UDTF Python ke SQL:

spark.udtf.register("simple_udtf", SimpleUDTF)

Setelah terdaftar, Anda dapat menggunakan UDTF di SQL menggunakan %sql perintah atau spark.sql() fungsi ajaib, seperti dalam contoh berikut:

%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")

Menghasilkan hasil

UDTF Python diimplementasikan dengan yield untuk mengembalikan hasil. Hasil selalu dikembalikan sebagai tabel yang berisi 0 baris atau lebih dengan skema yang ditentukan.

Saat meneruskan argumen skalar, logika dalam eval metode berjalan persis sekali dengan kumpulan argumen skalar yang diteruskan. Untuk argumen tabel, eval metode berjalan sekali untuk setiap baris dalam tabel input.

Logika dapat ditulis untuk mengembalikan 0, 1, atau banyak baris per input.

UDTF berikut menunjukkan mengembalikan 0 baris atau lebih untuk setiap input dengan memisahkan item dari daftar yang dipisahkan koma menjadi entri terpisah:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

Meneruskan argumen tabel ke UDTF

Anda dapat menggunakan kata kunci TABLE() SQL untuk meneruskan argumen tabel ke UDTF. Anda bisa menggunakan nama tabel atau kueri, seperti dalam contoh berikut:

TABLE(table_name);
TABLE(SELECT * FROM table_name);

Argumen tabel diproses satu baris dalam satu waktu. Anda dapat menggunakan anotasi bidang kolom PySpark standar untuk berinteraksi dengan kolom di setiap baris. Contoh berikut menunjukkan secara eksplisit mengimpor jenis PySpark Row lalu memfilter tabel yang diteruskan pada id bidang :

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# |  6|
# |  7|
# |  8|
# |  9|
# +---+

Meneruskan argumen skalar ke UDTF

Anda dapat meneruskan argumen skalar ke UDTF menggunakan kombinasi nilai berikut:

  • Konstanta skalar
  • Fungsi bernilai skalar
  • Bidang dalam relasi

Untuk meneruskan bidang dalam relasi, Anda harus mendaftarkan UDTF dan menggunakan kata kunci SQL LATERAL .

Catatan

Anda dapat menggunakan alias tabel sebaris untuk memisahkan kolom.

Contoh berikut menunjukkan penggunaan LATERAL untuk meneruskan bidang dari tabel ke UDTF:

from pyspark.sql.functions import udtf

@udtf(returnType="id: int, item: string")
class Itemize:
    def eval(self, id: int, item_list: str):
        items = item_list.split(",")
        for item in items:
            if item != "":
                yield id, item

spark.udtf.register("itemize", Itemize)

spark.sql("""
    SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
    (2, 'spoons,'),
    (3, ''),
    (4, 'knives,cups') t(id, item_list),
    LATERAL itemize(id, item_list) b
""").show()

Mengatur nilai default untuk UDTF

Anda dapat secara opsional menerapkan __init__ metode untuk mengatur nilai default untuk variabel kelas yang dapat Anda referensikan dalam logika Python Anda.

Metode __init__ ini tidak menerima argumen apa pun dan tidak memiliki akses ke variabel atau informasi status dalam SparkSession.

Menggunakan Apache Arrow dengan UDTF

Databricks merekomendasikan penggunaan Apache Arrow untuk UDTF yang menerima sejumlah kecil data sebagai input tetapi menghasilkan tabel besar.

Anda dapat mengaktifkan Arrow dengan menentukan useArrow parameter saat mendeklarasikan UDTF, seperti dalam contoh berikut:

from pyspark.sql.functions import udtf

@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1