Bagikan melalui


Fungsi tabel yang ditentukan pengguna Python (UDTF)

Penting

Fitur ini ada dalam Pratinjau Umum di Databricks Runtime 14.3 LTS dan versi di atasnya.

Fungsi tabel yang ditentukan pengguna (UDTF) memungkinkan Anda mendaftarkan fungsi yang mengembalikan tabel alih-alih nilai skalar. Tidak seperti fungsi skalar yang mengembalikan nilai hasil tunggal dari setiap panggilan, setiap UDTF dipanggil dalam klausul pernyataan FROM SQL dan mengembalikan seluruh tabel sebagai output.

Setiap panggilan UDTF dapat menerima argumen nol atau lebih. Argumen ini dapat berupa ekspresi skalar atau argumen tabel yang mewakili seluruh tabel input.

UDTF dapat didaftarkan dengan dua cara:

Petunjuk

Databricks merekomendasikan pendaftaran UDTF di Unity Catalog untuk memanfaatkan tata kelola terpusat yang memudahkan untuk berbagi dan menggunakan kembali fungsi dengan aman di seluruh pengguna dan tim.

Sintaks UDTF dasar

Apache Spark mengimplementasikan UDTF Python sebagai kelas Python dengan metode eval wajib yang menggunakan yield untuk menghasilkan baris keluaran.

Untuk menggunakan kelas Anda sebagai UDTF, Anda harus mengimpor fungsi udtf PySpark. Databricks merekomendasikan penggunaan fungsi ini sebagai dekorator dan secara eksplisit menentukan nama dan jenis bidang menggunakan opsi returnType (kecuali kelas menentukan metode analyze seperti yang dijelaskan di bagian selanjutnya).

UDTF berikut membuat tabel menggunakan daftar tetap dari dua argumen bilangan bulat:

from pyspark.sql.functions import lit, udtf

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()
+----+-----+
| sum| diff|
+----+-----+
|   3|   -1|
+----+-----+

Daftarkan UDTF

Untuk mendaftarkan UDTF dengan cakupan sesi untuk digunakan dalam kueri SQL, gunakan spark.udtf.register(). Berikan nama untuk fungsi SQL dan kelas UDTF Python.

spark.udtf.register("get_sum_diff", GetSumDiff)

Memanggil UDTF yang terdaftar

Setelah terdaftar, Anda dapat menggunakan UDTF di SQL menggunakan perintah magic %sql atau fungsi spark.sql():

spark.udtf.register("get_sum_diff", GetSumDiff)
spark.sql("SELECT * FROM get_sum_diff(1,2);").show()
%sql
SELECT * FROM get_sum_diff(1,2);

Meningkatkan UDTF dengan cakupan sesi ke Katalog Unity

Penting

Mendaftarkan UDTF Python di Unity Catalog sedang dalam Pratinjau Umum. Unity Catalog UDTF memerlukan Databricks Runtime versi 17.1 ke atas. Lihat Persyaratan.

Anda dapat meningkatkan UDTF dengan cakupan sesi ke Unity Catalog untuk memanfaatkan tata kelola terpusat dan memudahkan untuk berbagi dan menggunakan kembali fungsi dengan aman di seluruh pengguna dan tim.

Untuk meningkatkan UDTF dengan cakupan sesi ke Unity Catalog, gunakan SQL DDL dengan CREATE OR REPLACE FUNCTION pernyataan . Contoh berikut menunjukkan cara mengonversi GetSumDiff UDTF dari fungsi cakupan sesi ke fungsi Katalog Unity:

CREATE OR REPLACE FUNCTION get_sum_diff(x INT, y INT)
RETURNS TABLE (sum INT, diff INT)
LANGUAGE PYTHON
HANDLER 'GetSumDiff'
AS $$
class GetSumDiff:
    def eval(self, x: int, y: int):
        yield x + y, x - y
$$;

SELECT * FROM get_sum_diff(10, 3);
+-----+------+
| sum | diff |
+-----+------+
| 13  | 7    |
+-----+------+

Untuk informasi selengkapnya tentang Unity Catalog UDTF, lihat Fungsi tabel yang ditentukan pengguna (UDTF) Python di Unity Catalog.

Gunakan Panah Apache

Jika UDTF Anda menerima sejumlah kecil data sebagai input tetapi menghasilkan tabel besar, Databricks merekomendasikan penggunaan Apache Arrow. Anda dapat mengaktifkannya dengan menentukan parameter useArrow saat mendeklarasikan UDTF:

@udtf(returnType="c1: int, c2: int", useArrow=True)

Daftar argumen variabel - *args dan **kwargs

Anda dapat menggunakan sintaks Python *args atau **kwargs dan menerapkan logika untuk menangani jumlah nilai input yang tidak ditentukan.

Contoh berikut mengembalikan hasil yang sama sambil secara eksplisit memeriksa panjang input dan jenis untuk argumen:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, *args):
        assert(len(args) == 2)
        assert(isinstance(arg, int) for arg in args)
        x = args[0]
        y = args[1]
        yield x + y, x - y

GetSumDiff(lit(1), lit(2)).show()

Berikut adalah contoh yang sama, tetapi menggunakan argumen kata kunci:

@udtf(returnType="sum: int, diff: int")
class GetSumDiff:
    def eval(self, **kwargs):
        x = kwargs["x"]
        y = kwargs["y"]
        yield x + y, x - y

GetSumDiff(x=lit(1), y=lit(2)).show()

Menentukan skema statis pada waktu pendaftaran

UDTF mengembalikan baris dengan skema output yang terdiri dari urutan nama dan jenis kolom. Jika skema UDTF harus selalu tetap sama untuk semua kueri, Anda dapat menentukan skema tetap statis setelah dekorator @udtf. Ini haruslah berupa StructType:

StructType().add("c1", StringType())

Atau string DDL yang mewakili jenis struct:

c1: string

Menghitung skema dinamis pada waktu panggilan fungsi

UDTF juga dapat menghitung skema output secara terprogram untuk setiap panggilan tergantung pada nilai argumen input. Untuk melakukan ini, tentukan metode statis yang disebut analyze yang menerima parameter nol atau lebih yang sesuai dengan argumen yang disediakan untuk panggilan UDTF tertentu.

Setiap argumen metode analyze adalah instans kelas AnalyzeArgument yang berisi bidang berikut:

kolom kelas AnalyzeArgument Deskripsi
dataType Jenis argumen input adalah DataType. Untuk argumen tabel input, ini adalah StructType yang mewakili kolom tabel.
value Nilai dari argumen input adalah Optional[Any]. Ini None untuk argumen tabel atau argumen skalar harfiah yang tidak konstan.
isTable Apakah argumen input berupa tabel BooleanType.
isConstantExpression Apakah argumen input adalah ekspresi yang dapat dilipat secara konstan sebagai BooleanType.

Metode analyze mengembalikan instance dari kelas AnalyzeResult, yang mencakup skema tabel hasil sebagai StructType serta beberapa bidang opsional. Jika UDTF menerima argumen tabel input, maka AnalyzeResult juga dapat menyertakan cara yang diminta untuk mempartisi dan mengurutkan baris tabel input di beberapa panggilan UDTF, seperti yang dijelaskan nanti.

kolom kelas AnalyzeResult Deskripsi
schema Skema tabel hasil adalah StructType.
withSinglePartition Apakah semua baris input akan dikirim ke instans kelas UDTF yang sama dengan BooleanType.
partitionBy Apabila diatur agar tidak kosong, semua baris dengan setiap kombinasi unik nilai dari ekspresi partisi digunakan oleh instans terpisah dari kelas UDTF.
orderBy Jika diatur ke tidak kosong, ini menentukan urutan baris dalam setiap partisi.
select Jika diatur ke kondisi tidak kosong, ini adalah urutan ekspresi yang ditentukan oleh UDTF agar Catalyst mengevaluasi terhadap kolom-kolom dalam argumen input TABLE. UDTF menerima satu atribut input untuk setiap nama dalam daftar dalam urutan yang tercantum.

Contoh analyze ini mengembalikan satu kolom output untuk setiap kata dalam argumen string input.

from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult


@udtf
class MyUDTF:
  @staticmethod
  def analyze(text: AnalyzeArgument) -> AnalyzeResult:
    schema = StructType()
    for index, word in enumerate(sorted(list(set(text.value.split(" "))))):
      schema = schema.add(f"word_{index}", IntegerType())
    return AnalyzeResult(schema=schema)

  def eval(self, text: str):
    counts = {}
    for word in text.split(" "):
      if word not in counts:
            counts[word] = 0
      counts[word] += 1
    result = []
    for word in sorted(list(set(text.split(" ")))):
      result.append(counts[word])
    yield result

MyUDTF(lit("hello world")).columns
['word_0', 'word_1']

Meneruskan status ke panggilan eval mendatang

Metode analyze dapat berfungsi sebagai tempat yang nyaman untuk melakukan inisialisasi dan kemudian meneruskan hasilnya ke pemanggilan metode eval di masa mendatang untuk panggilan UDTF yang sama.

Untuk melakukannya, buat subkelas AnalyzeResult dan kembalikan instans subkelas dari metode analyze. Kemudian, tambahkan argumen tambahan ke metode __init__ untuk menerima instans tersebut.

Contoh analyze ini mengembalikan skema output konstan, tetapi menambahkan informasi kustom dalam metadata hasil yang akan digunakan oleh panggilan metode __init__ di masa mendatang:

from pyspark.sql.functions import lit, udtf
from pyspark.sql.types import StructType, IntegerType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@dataclass
class AnalyzeResultWithBuffer(AnalyzeResult):
    buffer: str = ""

@udtf
class TestUDTF:
  def __init__(self, analyze_result=None):
    self._total = 0
    if analyze_result is not None:
      self._buffer = analyze_result.buffer
    else:
      self._buffer = ""

  @staticmethod
  def analyze(argument, _) -> AnalyzeResult:
    if (
      argument.value is None
      or argument.isTable
      or not isinstance(argument.value, str)
      or len(argument.value) == 0
    ):
      raise Exception("The first argument must be a non-empty string")
    assert argument.dataType == StringType()
    assert not argument.isTable
    return AnalyzeResultWithBuffer(
      schema=StructType()
        .add("total", IntegerType())
        .add("buffer", StringType()),
      withSinglePartition=True,
      buffer=argument.value,
    )

  def eval(self, argument, row: Row):
    self._total += 1

  def terminate(self):
    yield self._total, self._buffer

spark.udtf.register("test_udtf", TestUDTF)

spark.sql(
  """
  WITH t AS (
    SELECT id FROM range(1, 21)
  )
  SELECT total, buffer
  FROM test_udtf("abc", TABLE(t))
  """
).show()
+-------+-------+
| count | buffer|
+-------+-------+
|    20 |  "abc"|
+-------+-------+

Hasilkan baris keluaran

Metode eval berjalan sekali untuk setiap baris argumen tabel input (atau hanya sekali jika tidak ada argumen tabel yang disediakan), diikuti oleh satu pemanggilan metode terminate di akhir. Salah satu metode menghasilkan nol atau lebih baris yang sesuai dengan skema hasil dengan menghasilkan tuple, daftar, atau objek pyspark.sql.Row.

Contoh ini mengembalikan baris dengan memberikan tuple yang terdiri dari tiga elemen.

def eval(self, x, y, z):
  yield (x, y, z)

Anda juga dapat menghilangkan tanda kurung:

def eval(self, x, y, z):
  yield x, y, z

Tambahkan koma berikutnya untuk mengembalikan baris hanya dengan satu kolom:

def eval(self, x, y, z):
  yield x,

Anda juga dapat menghasilkan objek pyspark.sql.Row.

def eval(self, x, y, z):
  from pyspark.sql.types import Row
  yield Row(x, y, z)

Contoh ini menghasilkan baris output dari metode terminate menggunakan daftar Python. Anda dapat menyimpan status di dalam kelas dari langkah-langkah sebelumnya dalam evaluasi UDTF untuk tujuan ini.

def terminate(self):
  yield [self.x, self.y, self.z]

Meneruskan argumen skalar ke fungsi UDTF

Anda dapat meneruskan argumen skalar ke UDTF sebagai ekspresi konstanta yang terdiri dari nilai atau fungsi harfiah berdasarkannya. Misalnya:

SELECT * FROM get_sum_diff(1, y => 2)

Meneruskan argumen tabel ke UDTF

UDTF Python dapat menerima tabel input sebagai argumen selain argumen input skalar. Satu UDTF juga dapat menerima argumen tabel dan beberapa argumen skalar.

Kemudian setiap kueri SQL dapat menyediakan tabel input menggunakan kata kunci TABLE diikuti oleh tanda kurung di sekitar pengidentifikasi tabel yang sesuai, seperti TABLE(t). Atau, Anda dapat meneruskan subkueri tabel, seperti TABLE(SELECT a, b, c FROM t) atau TABLE(SELECT t1.a, t2.b FROM t1 INNER JOIN t2 USING (key)).

Argumen tabel input kemudian direpresentasikan sebagai argumen pyspark.sql.Row ke metode eval, dengan satu panggilan ke metode eval untuk setiap baris dalam tabel input. Anda dapat menggunakan anotasi bidang kolom PySpark standar untuk berinteraksi dengan kolom di setiap baris. Contoh berikut menunjukkan secara eksplisit mengimpor tipe PySpark Row lalu memfilter tabel yang diberikan pada bidang id.

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="id: int")
class FilterUDTF:
    def eval(self, row: Row):
        if row["id"] > 5:
            yield row["id"],

spark.udtf.register("filter_udtf", FilterUDTF)

Untuk mengkueri fungsi, gunakan kata kunci SQL TABLE:

SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)));
+---+
| id|
+---+
|  6|
|  7|
|  8|
|  9|
+---+

Tentukan pemartisian baris input dari panggilan fungsi

Saat memanggil UDTF dengan argumen tabel, kueri SQL apa pun dapat mempartisi tabel input di beberapa panggilan UDTF berdasarkan nilai satu atau beberapa kolom tabel input.

Untuk menentukan partisi, gunakan klausa PARTITION BY dalam panggilan fungsi setelah argumen TABLE. Ini menjamin bahwa semua baris input dengan setiap kombinasi nilai unik kolom partisi akan dikonsumsi oleh tepat satu instans kelas UDTF.

Perhatikan bahwa selain referensi kolom sederhana, klausa PARTITION BY juga menerima ekspresi arbitrer berdasarkan kolom tabel input. Misalnya, Anda dapat menentukan LENGTH string, mengekstrak sebulan dari tanggal, atau menggabungkan dua nilai.

Dimungkinkan juga untuk menentukan WITH SINGLE PARTITION alih-alih PARTITION BY untuk meminta hanya satu partisi di mana semua baris input harus digunakan oleh tepat satu instans kelas UDTF.

Dalam setiap partisi, Anda dapat secara opsional menentukan urutan baris input yang diperlukan saat metode UDTF eval mengonsumsinya. Untuk melakukannya, berikan klausa ORDER BY setelah klausa PARTITION BY atau WITH SINGLE PARTITION yang dijelaskan di atas.

Sebagai contoh, perhatikan UDTF berikut:

from pyspark.sql.functions import udtf
from pyspark.sql.types import Row

@udtf(returnType="a: string, b: int")
class FilterUDTF:
  def __init__(self):
    self.key = ""
    self.max = 0

  def eval(self, row: Row):
    self.key = row["a"]
    self.max = max(self.max, row["b"])

  def terminate(self):
    yield self.key, self.max

spark.udtf.register("filter_udtf", FilterUDTF)

Anda dapat menentukan opsi partisi saat memanggil UDTF melalui tabel input dengan beberapa cara:

-- Create an input table with some example values.
DROP TABLE IF EXISTS values_table;
CREATE TABLE values_table (a STRING, b INT);
INSERT INTO values_table VALUES ('abc', 2), ('abc', 4), ('def', 6), ('def', 8);
SELECT * FROM values_table;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 2  |
| "abc" | 4  |
| "def" | 6  |
| "def" | 8  |
+-------+----+
-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique value in the `a` column are processed by the same
-- instance of the UDTF class. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY a ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "abc" | 4  |
| "def" | 8  |
+-------+----+

-- Query the UDTF with the input table as an argument and a directive to partition the input
-- rows such that all rows with each unique result of evaluating the "LENGTH(a)" expression are
-- processed by the same instance of the UDTF class. Within each partition, the rows are ordered
-- by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) PARTITION BY LENGTH(a) ORDER BY b) ORDER BY 1;
+-------+---+
|     a | b |
+-------+---+
| "def" | 8 |
+-------+---+
-- Query the UDTF with the input table as an argument and a directive to consider all the input
-- rows in one single partition such that exactly one instance of the UDTF class consumes all of
-- the input rows. Within each partition, the rows are ordered by the `b` column.
SELECT * FROM filter_udtf(TABLE(values_table) WITH SINGLE PARTITION ORDER BY b) ORDER BY 1;
+-------+----+
|     a |  b |
+-------+----+
| "def" | 8 |
+-------+----+

Tentukan partisi baris input dari metode analyze

Perhatikan bahwa untuk setiap cara di atas untuk mempartisi tabel input saat memanggil UDTF dalam kueri SQL, ada cara yang sesuai untuk metode UDTF analyze untuk menentukan metode partisi yang sama secara otomatis sebagai gantinya.

  • Alih-alih memanggil UDTF sebagai SELECT * FROM udtf(TABLE(t) PARTITION BY a), Anda dapat memperbarui metode analyze untuk mengatur bidang partitionBy=[PartitioningColumn("a")] dan hanya memanggil fungsi menggunakan SELECT * FROM udtf(TABLE(t)).
  • Dengan token yang sama, alih-alih menentukan TABLE(t) WITH SINGLE PARTITION ORDER BY b dalam kueri SQL, Anda dapat membuat analyze mengatur bidang withSinglePartition=true dan orderBy=[OrderingColumn("b")] lalu hanya meneruskan TABLE(t).
  • Alih-alih meneruskan TABLE(SELECT a FROM t) dalam kueri SQL, Anda dapat membuat analyze mengatur select=[SelectedColumn("a")] lalu hanya meneruskan TABLE(t).

Dalam contoh berikut, analyze mengembalikan skema output konstanta, memilih subset kolom dari tabel input, dan menentukan bahwa tabel input dipartisi di beberapa panggilan UDTF berdasarkan nilai kolom date:

@staticmethod
def analyze(*args) -> AnalyzeResult:
  """
  The input table will be partitioned across several UDTF calls based on the monthly
  values of each `date` column. The rows within each partition will arrive ordered by the `date`
  column. The UDTF will only receive the `date` and `word` columns from the input table.
  """
  from pyspark.sql.functions import (
    AnalyzeResult,
    OrderingColumn,
    PartitioningColumn,
  )

  assert len(args) == 1, "This function accepts one argument only"
  assert args[0].isTable, "Only table arguments are supported"
  return AnalyzeResult(
    schema=StructType()
      .add("month", DateType())
      .add("longest_word", IntegerType()),
    partitionBy=[
      PartitioningColumn("extract(month from date)")],
    orderBy=[
      OrderingColumn("date")],
    select=[
      SelectedColumn("date"),
      SelectedColumn(
        name="length(word)",
        alias="length_word")])