Bagikan melalui


udtf

Membuat fungsi tabel yang ditentukan pengguna (UDTF).

Syntax

import pyspark.sql.functions as sf

# As a decorator
@sf.udtf(returnType=<returnType>, useArrow=<useArrow>)
class FunctionClass:
    def eval(self, *args):
        # function body
        yield row_data

# As a function wrapper
sf.udtf(cls=<class>, returnType=<returnType>, useArrow=<useArrow>)

Parameter-parameternya

Pengaturan Tipe Description
cls class Optional. Kelas handler fungsi tabel yang ditentukan pengguna Python.
returnType pyspark.sql.types.StructType atau str Optional. Jenis pengembalian fungsi tabel yang ditentukan pengguna. Nilai dapat berupa objek StructType atau string jenis struct berformat DDL. Jika Tidak Ada, kelas handler harus menyediakan analyze metode statis.
useArrow bool Optional. Apakah akan menggunakan Panah untuk mengoptimalkan serialisasi (de).. Saat diatur ke Tidak Ada, konfigurasi Spark "spark.sql.execution.pythonUDTF.arrow.enabled" digunakan.

Examples

Contoh 1: Implementasi UDTF dasar.

from pyspark.sql.functions import udtf

class TestUDTF:
    def eval(self, *args):
        yield "hello", "world"

test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string")
test_udtf().show()
+-----+-----+
|   c1|   c2|
+-----+-----+
|hello|world|
+-----+-----+

Contoh 2: UDTF menggunakan sintaks dekorator.

from pyspark.sql.functions import udtf, lit

@udtf(returnType="c1: int, c2: int")
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1

PlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
|  1|  2|
+---+---+

Contoh 3: UDTF dengan metode analisis statis.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithAnalyze:
    @staticmethod
    def analyze(a: AnalyzeArgument, b: AnalyzeArgument) -> AnalyzeResult:
        return AnalyzeResult(StructType().add("a", a.dataType).add("b", b.dataType))

    def eval(self, a, b):
        yield a, b

TestUDTFWithAnalyze(lit(1), lit("x")).show()
+---+---+
|  a|  b|
+---+---+
|  1|  x|
+---+---+

Contoh 4: UDTF dengan argumen kata kunci.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithKwargs:
    @staticmethod
    def analyze(
        a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
    ) -> AnalyzeResult:
        return AnalyzeResult(
            StructType().add("a", a.dataType)
                .add("b", b.dataType)
                .add("x", kwargs["x"].dataType)
        )

    def eval(self, a, b, **kwargs):
        yield a, b, kwargs["x"]

TestUDTFWithKwargs(lit(1), x=lit("x"), b=lit("b")).show()
+---+---+---+
|  a|  b|  x|
+---+---+---+
|  1|  b|  x|
+---+---+---+

Contoh 5: UDTF terdaftar dan dipanggil melalui SQL.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithKwargs:
    @staticmethod
    def analyze(
        a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
    ) -> AnalyzeResult:
        return AnalyzeResult(
            StructType().add("a", a.dataType)
                .add("b", b.dataType)
                .add("x", kwargs["x"].dataType)
        )

    def eval(self, a, b, **kwargs):
        yield a, b, kwargs["x"]

_ = spark.udtf.register("test_udtf", TestUDTFWithKwargs)
spark.sql("SELECT * FROM test_udtf(1, x => 'x', b => 'b')").show()
+---+---+---+
|  a|  b|  x|
+---+---+---+
|  1|  b|  x|
+---+---+---+

Contoh 6: UDTF dengan pengoptimalan Panah diaktifkan.

from pyspark.sql.functions import udtf, lit

@udtf(returnType="c1: int, c2: int", useArrow=True)
class ArrowPlusOne:
    def eval(self, x: int):
        yield x, x + 1

ArrowPlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
|  1|  2|
+---+---+

Contoh 7: Membuat UDTF deterministik.

from pyspark.sql.functions import udtf

class PlusOne:
    def eval(self, a: int):
        yield a + 1,

plus_one = udtf(PlusOne, returnType="r: int").asDeterministic()