Κοινοποίηση μέσω


udtf

Creates a user defined table function (UDTF).

Syntax

import pyspark.sql.functions as sf

# As a decorator
@sf.udtf(returnType=<returnType>, useArrow=<useArrow>)
class FunctionClass:
    def eval(self, *args):
        # function body
        yield row_data

# As a function wrapper
sf.udtf(cls=<class>, returnType=<returnType>, useArrow=<useArrow>)

Parameters

Parameter Type Description
cls class Optional. The Python user-defined table function handler class.
returnType pyspark.sql.types.StructType or str Optional. The return type of the user-defined table function. The value can be either a StructType object or a DDL-formatted struct type string. If None, the handler class must provide analyze static method.
useArrow bool Optional. Whether to use Arrow to optimize the (de)serializations. When it's set to None, the Spark config "spark.sql.execution.pythonUDTF.arrow.enabled" is used.

Examples

Example 1: Basic UDTF implementation.

from pyspark.sql.functions import udtf

class TestUDTF:
    def eval(self, *args):
        yield "hello", "world"

test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string")
test_udtf().show()
+-----+-----+
|   c1|   c2|
+-----+-----+
|hello|world|
+-----+-----+

Example 2: UDTF using decorator syntax.

from pyspark.sql.functions import udtf, lit

@udtf(returnType="c1: int, c2: int")
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1

PlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
|  1|  2|
+---+---+

Example 3: UDTF with analyze static method.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithAnalyze:
    @staticmethod
    def analyze(a: AnalyzeArgument, b: AnalyzeArgument) -> AnalyzeResult:
        return AnalyzeResult(StructType().add("a", a.dataType).add("b", b.dataType))

    def eval(self, a, b):
        yield a, b

TestUDTFWithAnalyze(lit(1), lit("x")).show()
+---+---+
|  a|  b|
+---+---+
|  1|  x|
+---+---+

Example 4: UDTF with keyword arguments.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithKwargs:
    @staticmethod
    def analyze(
        a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
    ) -> AnalyzeResult:
        return AnalyzeResult(
            StructType().add("a", a.dataType)
                .add("b", b.dataType)
                .add("x", kwargs["x"].dataType)
        )

    def eval(self, a, b, **kwargs):
        yield a, b, kwargs["x"]

TestUDTFWithKwargs(lit(1), x=lit("x"), b=lit("b")).show()
+---+---+---+
|  a|  b|  x|
+---+---+---+
|  1|  b|  x|
+---+---+---+

Example 5: UDTF registered and called via SQL.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithKwargs:
    @staticmethod
    def analyze(
        a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
    ) -> AnalyzeResult:
        return AnalyzeResult(
            StructType().add("a", a.dataType)
                .add("b", b.dataType)
                .add("x", kwargs["x"].dataType)
        )

    def eval(self, a, b, **kwargs):
        yield a, b, kwargs["x"]

_ = spark.udtf.register("test_udtf", TestUDTFWithKwargs)
spark.sql("SELECT * FROM test_udtf(1, x => 'x', b => 'b')").show()
+---+---+---+
|  a|  b|  x|
+---+---+---+
|  1|  b|  x|
+---+---+---+

Example 6: UDTF with Arrow optimization enabled.

from pyspark.sql.functions import udtf, lit

@udtf(returnType="c1: int, c2: int", useArrow=True)
class ArrowPlusOne:
    def eval(self, x: int):
        yield x, x + 1

ArrowPlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
|  1|  2|
+---+---+

Example 7: Creating a deterministic UDTF.

from pyspark.sql.functions import udtf

class PlusOne:
    def eval(self, a: int):
        yield a + 1,

plus_one = udtf(PlusOne, returnType="r: int").asDeterministic()