Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
Создает определяемую пользователем функцию таблицы (UDTF).
Синтаксис
import pyspark.sql.functions as sf
# As a decorator
@sf.udtf(returnType=<returnType>, useArrow=<useArrow>)
class FunctionClass:
def eval(self, *args):
# function body
yield row_data
# As a function wrapper
sf.udtf(cls=<class>, returnType=<returnType>, useArrow=<useArrow>)
Параметры
| Параметр | Тип | Description |
|---|---|---|
cls |
class |
Необязательно. Класс обработчика определяемых пользователем функций таблицы Python. |
returnType |
pyspark.sql.types.StructType или str |
Необязательно. Возвращаемый тип определяемой пользователем функции таблицы. Это значение может быть либо объектом StructType, либо строкой типа структуры в формате DDL. Если нет, класс обработчика должен предоставить analyze статический метод. |
useArrow |
bool |
Необязательно. Следует ли использовать стрелку для оптимизации сериализации (de). Если для него задано значение None, используется конфигурация Spark "spark.sql.execution.pythonUDTF.arrow". |
Примеры
Пример 1. Базовая реализация UDTF.
from pyspark.sql.functions import udtf
class TestUDTF:
def eval(self, *args):
yield "hello", "world"
test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string")
test_udtf().show()
+-----+-----+
| c1| c2|
+-----+-----+
|hello|world|
+-----+-----+
Пример 2. UDTF с помощью синтаксиса декоратора.
from pyspark.sql.functions import udtf, lit
@udtf(returnType="c1: int, c2: int")
class PlusOne:
def eval(self, x: int):
yield x, x + 1
PlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
| 1| 2|
+---+---+
Пример 3. UDTF с анализом статического метода.
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class TestUDTFWithAnalyze:
@staticmethod
def analyze(a: AnalyzeArgument, b: AnalyzeArgument) -> AnalyzeResult:
return AnalyzeResult(StructType().add("a", a.dataType).add("b", b.dataType))
def eval(self, a, b):
yield a, b
TestUDTFWithAnalyze(lit(1), lit("x")).show()
+---+---+
| a| b|
+---+---+
| 1| x|
+---+---+
Пример 4. UDTF с аргументами ключевых слов.
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class TestUDTFWithKwargs:
@staticmethod
def analyze(
a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
) -> AnalyzeResult:
return AnalyzeResult(
StructType().add("a", a.dataType)
.add("b", b.dataType)
.add("x", kwargs["x"].dataType)
)
def eval(self, a, b, **kwargs):
yield a, b, kwargs["x"]
TestUDTFWithKwargs(lit(1), x=lit("x"), b=lit("b")).show()
+---+---+---+
| a| b| x|
+---+---+---+
| 1| b| x|
+---+---+---+
Пример 5. UDTF зарегистрирован и вызывается через SQL.
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class TestUDTFWithKwargs:
@staticmethod
def analyze(
a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
) -> AnalyzeResult:
return AnalyzeResult(
StructType().add("a", a.dataType)
.add("b", b.dataType)
.add("x", kwargs["x"].dataType)
)
def eval(self, a, b, **kwargs):
yield a, b, kwargs["x"]
_ = spark.udtf.register("test_udtf", TestUDTFWithKwargs)
spark.sql("SELECT * FROM test_udtf(1, x => 'x', b => 'b')").show()
+---+---+---+
| a| b| x|
+---+---+---+
| 1| b| x|
+---+---+---+
Пример 6. UDTF с включенной оптимизацией со стрелками.
from pyspark.sql.functions import udtf, lit
@udtf(returnType="c1: int, c2: int", useArrow=True)
class ArrowPlusOne:
def eval(self, x: int):
yield x, x + 1
ArrowPlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
| 1| 2|
+---+---+
Пример 7. Создание детерминированного UDTF.
from pyspark.sql.functions import udtf
class PlusOne:
def eval(self, a: int):
yield a + 1,
plus_one = udtf(PlusOne, returnType="r: int").asDeterministic()