Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Membuat fungsi tabel yang ditentukan pengguna (UDTF).
Syntax
import pyspark.sql.functions as sf
# As a decorator
@sf.udtf(returnType=<returnType>, useArrow=<useArrow>)
class FunctionClass:
def eval(self, *args):
# function body
yield row_data
# As a function wrapper
sf.udtf(cls=<class>, returnType=<returnType>, useArrow=<useArrow>)
Parameter-parameternya
| Pengaturan | Tipe | Description |
|---|---|---|
cls |
class |
Optional. Kelas handler fungsi tabel yang ditentukan pengguna Python. |
returnType |
pyspark.sql.types.StructType atau str |
Optional. Jenis pengembalian fungsi tabel yang ditentukan pengguna. Nilai dapat berupa objek StructType atau string jenis struct berformat DDL. Jika Tidak Ada, kelas handler harus menyediakan analyze metode statis. |
useArrow |
bool |
Optional. Apakah akan menggunakan Panah untuk mengoptimalkan serialisasi (de).. Saat diatur ke Tidak Ada, konfigurasi Spark "spark.sql.execution.pythonUDTF.arrow.enabled" digunakan. |
Examples
Contoh 1: Implementasi UDTF dasar.
from pyspark.sql.functions import udtf
class TestUDTF:
def eval(self, *args):
yield "hello", "world"
test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string")
test_udtf().show()
+-----+-----+
| c1| c2|
+-----+-----+
|hello|world|
+-----+-----+
Contoh 2: UDTF menggunakan sintaks dekorator.
from pyspark.sql.functions import udtf, lit
@udtf(returnType="c1: int, c2: int")
class PlusOne:
def eval(self, x: int):
yield x, x + 1
PlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
| 1| 2|
+---+---+
Contoh 3: UDTF dengan metode analisis statis.
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class TestUDTFWithAnalyze:
@staticmethod
def analyze(a: AnalyzeArgument, b: AnalyzeArgument) -> AnalyzeResult:
return AnalyzeResult(StructType().add("a", a.dataType).add("b", b.dataType))
def eval(self, a, b):
yield a, b
TestUDTFWithAnalyze(lit(1), lit("x")).show()
+---+---+
| a| b|
+---+---+
| 1| x|
+---+---+
Contoh 4: UDTF dengan argumen kata kunci.
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class TestUDTFWithKwargs:
@staticmethod
def analyze(
a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
) -> AnalyzeResult:
return AnalyzeResult(
StructType().add("a", a.dataType)
.add("b", b.dataType)
.add("x", kwargs["x"].dataType)
)
def eval(self, a, b, **kwargs):
yield a, b, kwargs["x"]
TestUDTFWithKwargs(lit(1), x=lit("x"), b=lit("b")).show()
+---+---+---+
| a| b| x|
+---+---+---+
| 1| b| x|
+---+---+---+
Contoh 5: UDTF terdaftar dan dipanggil melalui SQL.
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class TestUDTFWithKwargs:
@staticmethod
def analyze(
a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
) -> AnalyzeResult:
return AnalyzeResult(
StructType().add("a", a.dataType)
.add("b", b.dataType)
.add("x", kwargs["x"].dataType)
)
def eval(self, a, b, **kwargs):
yield a, b, kwargs["x"]
_ = spark.udtf.register("test_udtf", TestUDTFWithKwargs)
spark.sql("SELECT * FROM test_udtf(1, x => 'x', b => 'b')").show()
+---+---+---+
| a| b| x|
+---+---+---+
| 1| b| x|
+---+---+---+
Contoh 6: UDTF dengan pengoptimalan Panah diaktifkan.
from pyspark.sql.functions import udtf, lit
@udtf(returnType="c1: int, c2: int", useArrow=True)
class ArrowPlusOne:
def eval(self, x: int):
yield x, x + 1
ArrowPlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
| 1| 2|
+---+---+
Contoh 7: Membuat UDTF deterministik.
from pyspark.sql.functions import udtf
class PlusOne:
def eval(self, a: int):
yield a + 1,
plus_one = udtf(PlusOne, returnType="r: int").asDeterministic()