Поделиться через


udtf

Создает определяемую пользователем функцию таблицы (UDTF).

Синтаксис

import pyspark.sql.functions as sf

# As a decorator
@sf.udtf(returnType=<returnType>, useArrow=<useArrow>)
class FunctionClass:
    def eval(self, *args):
        # function body
        yield row_data

# As a function wrapper
sf.udtf(cls=<class>, returnType=<returnType>, useArrow=<useArrow>)

Параметры

Параметр Тип Description
cls class Необязательно. Класс обработчика определяемых пользователем функций таблицы Python.
returnType pyspark.sql.types.StructType или str Необязательно. Возвращаемый тип определяемой пользователем функции таблицы. Это значение может быть либо объектом StructType, либо строкой типа структуры в формате DDL. Если нет, класс обработчика должен предоставить analyze статический метод.
useArrow bool Необязательно. Следует ли использовать стрелку для оптимизации сериализации (de). Если для него задано значение None, используется конфигурация Spark "spark.sql.execution.pythonUDTF.arrow".

Примеры

Пример 1. Базовая реализация UDTF.

from pyspark.sql.functions import udtf

class TestUDTF:
    def eval(self, *args):
        yield "hello", "world"

test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string")
test_udtf().show()
+-----+-----+
|   c1|   c2|
+-----+-----+
|hello|world|
+-----+-----+

Пример 2. UDTF с помощью синтаксиса декоратора.

from pyspark.sql.functions import udtf, lit

@udtf(returnType="c1: int, c2: int")
class PlusOne:
    def eval(self, x: int):
        yield x, x + 1

PlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
|  1|  2|
+---+---+

Пример 3. UDTF с анализом статического метода.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithAnalyze:
    @staticmethod
    def analyze(a: AnalyzeArgument, b: AnalyzeArgument) -> AnalyzeResult:
        return AnalyzeResult(StructType().add("a", a.dataType).add("b", b.dataType))

    def eval(self, a, b):
        yield a, b

TestUDTFWithAnalyze(lit(1), lit("x")).show()
+---+---+
|  a|  b|
+---+---+
|  1|  x|
+---+---+

Пример 4. UDTF с аргументами ключевых слов.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithKwargs:
    @staticmethod
    def analyze(
        a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
    ) -> AnalyzeResult:
        return AnalyzeResult(
            StructType().add("a", a.dataType)
                .add("b", b.dataType)
                .add("x", kwargs["x"].dataType)
        )

    def eval(self, a, b, **kwargs):
        yield a, b, kwargs["x"]

TestUDTFWithKwargs(lit(1), x=lit("x"), b=lit("b")).show()
+---+---+---+
|  a|  b|  x|
+---+---+---+
|  1|  b|  x|
+---+---+---+

Пример 5. UDTF зарегистрирован и вызывается через SQL.

from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult

@udtf
class TestUDTFWithKwargs:
    @staticmethod
    def analyze(
        a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
    ) -> AnalyzeResult:
        return AnalyzeResult(
            StructType().add("a", a.dataType)
                .add("b", b.dataType)
                .add("x", kwargs["x"].dataType)
        )

    def eval(self, a, b, **kwargs):
        yield a, b, kwargs["x"]

_ = spark.udtf.register("test_udtf", TestUDTFWithKwargs)
spark.sql("SELECT * FROM test_udtf(1, x => 'x', b => 'b')").show()
+---+---+---+
|  a|  b|  x|
+---+---+---+
|  1|  b|  x|
+---+---+---+

Пример 6. UDTF с включенной оптимизацией со стрелками.

from pyspark.sql.functions import udtf, lit

@udtf(returnType="c1: int, c2: int", useArrow=True)
class ArrowPlusOne:
    def eval(self, x: int):
        yield x, x + 1

ArrowPlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
|  1|  2|
+---+---+

Пример 7. Создание детерминированного UDTF.

from pyspark.sql.functions import udtf

class PlusOne:
    def eval(self, a: int):
        yield a + 1,

plus_one = udtf(PlusOne, returnType="r: int").asDeterministic()