Udostępnij przez


udtf

Tworzy funkcję tabeli zdefiniowanej przez użytkownika (UDTF).

Składnia

import pyspark.sql.functions as sf

# As a decorator
@sf.udtf(returnType=<returnType>, useArrow=<useArrow>)
class FunctionClass:
    def eval(self, *args):
        # function body
        yield row_data

# As a function wrapper
sf.udtf(cls=<class>, returnType=<returnType>, useArrow=<useArrow>)

Parametry

Parameter Typ Description
cls class Opcjonalny. Klasa obsługi funkcji tabeli zdefiniowana przez użytkownika języka Python.
returnType pyspark.sql.types.StructType lub str Opcjonalny. Zwracany typ funkcji tabeli zdefiniowanej przez użytkownika. Wartość może być obiektem StructType lub ciągiem typu struktury sformatowanym w formacie DDL. Jeśli brak, klasa obsługi musi podać analyze metodę statyczną.
useArrow bool Opcjonalny. Czy używać strzałki do optymalizowania serializacji (de). Gdy jest ustawiona wartość Brak, używana jest konfiguracja platformy Spark "spark.sql.execution.pythonUDTF.arrow.enabled".

Przykłady

Przykład 1. Podstawowa implementacja udTF.

from pyspark.sql.functions import udtf

class TestUDTF:
    def eval(self, *args):
        yield "hello", "world"

test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string")
test_udtf().show()
+-----+-----+
|   c1|   c2|
+-----+-----+
|hello|world|
+-----+-----+

Przykład 2: UDTF przy użyciu składni dekoratora.

from pyspark.sql.functions import udtf, lit

@udtf(returnType="c1: int, c2: int")
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1

PlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
|  1|  2|
+---+---+

Przykład 3: funkcja UDTF z analizą metody statycznej.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithAnalyze:
    @staticmethod
    def analyze(a: AnalyzeArgument, b: AnalyzeArgument) -> AnalyzeResult:
        return AnalyzeResult(StructType().add("a", a.dataType).add("b", b.dataType))

    def eval(self, a, b):
        yield a, b

TestUDTFWithAnalyze(lit(1), lit("x")).show()
+---+---+
|  a|  b|
+---+---+
|  1|  x|
+---+---+

Przykład 4: UDTF z argumentami słowa kluczowego.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithKwargs:
    @staticmethod
    def analyze(
        a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
    ) -> AnalyzeResult:
        return AnalyzeResult(
            StructType().add("a", a.dataType)
                .add("b", b.dataType)
                .add("x", kwargs["x"].dataType)
        )

    def eval(self, a, b, **kwargs):
        yield a, b, kwargs["x"]

TestUDTFWithKwargs(lit(1), x=lit("x"), b=lit("b")).show()
+---+---+---+
|  a|  b|  x|
+---+---+---+
|  1|  b|  x|
+---+---+---+

Przykład 5: zarejestrowany i wywoływany za pośrednictwem języka SQL przez program UDTF.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithKwargs:
    @staticmethod
    def analyze(
        a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
    ) -> AnalyzeResult:
        return AnalyzeResult(
            StructType().add("a", a.dataType)
                .add("b", b.dataType)
                .add("x", kwargs["x"].dataType)
        )

    def eval(self, a, b, **kwargs):
        yield a, b, kwargs["x"]

_ = spark.udtf.register("test_udtf", TestUDTFWithKwargs)
spark.sql("SELECT * FROM test_udtf(1, x => 'x', b => 'b')").show()
+---+---+---+
|  a|  b|  x|
+---+---+---+
|  1|  b|  x|
+---+---+---+

Przykład 6: funkcja UDTF z włączoną optymalizacją strzałek.

from pyspark.sql.functions import udtf, lit

@udtf(returnType="c1: int, c2: int", useArrow=True)
class ArrowPlusOne:
    def eval(self, x: int):
        yield x, x + 1

ArrowPlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
|  1|  2|
+---+---+

Przykład 7. Tworzenie deterministycznej funkcji UDTF.

from pyspark.sql.functions import udtf

class PlusOne:
    def eval(self, a: int):
        yield a + 1,

plus_one = udtf(PlusOne, returnType="r: int").asDeterministic()