Oharra
Baimena behar duzu orria atzitzeko. Direktorioetan saioa has dezakezu edo haiek alda ditzakezu.
Baimena behar duzu orria atzitzeko. Direktorioak alda ditzakezu.
Crea una función de tabla definida por el usuario (UDTF).
Syntax
import pyspark.sql.functions as sf
# As a decorator
@sf.udtf(returnType=<returnType>, useArrow=<useArrow>)
class FunctionClass:
def eval(self, *args):
# function body
yield row_data
# As a function wrapper
sf.udtf(cls=<class>, returnType=<returnType>, useArrow=<useArrow>)
Parámetros
| Parámetro | Tipo | Description |
|---|---|---|
cls |
class |
Optional. Clase de controlador de funciones de tabla definidas por el usuario de Python. |
returnType |
pyspark.sql.types.StructType o str |
Optional. Tipo de valor devuelto de la función de tabla definida por el usuario. El valor puede ser un objeto StructType o una cadena de tipo de estructura con formato DDL. Si es None, la clase de controlador debe proporcionar el analyze método estático. |
useArrow |
bool |
Optional. Indica si se debe usar Arrow para optimizar las serializaciones (de). Cuando se establece en None, se usa la configuración de Spark "spark.sql.execution.pythonUDTF.arrow.enabled". |
Examples
Ejemplo 1: Implementación básica de UDTF.
from pyspark.sql.functions import udtf
class TestUDTF:
def eval(self, *args):
yield "hello", "world"
test_udtf = udtf(TestUDTF, returnType="c1: string, c2: string")
test_udtf().show()
+-----+-----+
| c1| c2|
+-----+-----+
|hello|world|
+-----+-----+
Ejemplo 2: UDTF mediante la sintaxis del decorador.
from pyspark.sql.functions import udtf, lit
@udtf(returnType="c1: int, c2: int")
class PlusOne:
def eval(self, x: int):
yield x, x + 1
PlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
| 1| 2|
+---+---+
Ejemplo 3: UDTF con el método estático de análisis.
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class TestUDTFWithAnalyze:
@staticmethod
def analyze(a: AnalyzeArgument, b: AnalyzeArgument) -> AnalyzeResult:
return AnalyzeResult(StructType().add("a", a.dataType).add("b", b.dataType))
def eval(self, a, b):
yield a, b
TestUDTFWithAnalyze(lit(1), lit("x")).show()
+---+---+
| a| b|
+---+---+
| 1| x|
+---+---+
Ejemplo 4: UDTF con argumentos de palabra clave.
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class TestUDTFWithKwargs:
@staticmethod
def analyze(
a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
) -> AnalyzeResult:
return AnalyzeResult(
StructType().add("a", a.dataType)
.add("b", b.dataType)
.add("x", kwargs["x"].dataType)
)
def eval(self, a, b, **kwargs):
yield a, b, kwargs["x"]
TestUDTFWithKwargs(lit(1), x=lit("x"), b=lit("b")).show()
+---+---+---+
| a| b| x|
+---+---+---+
| 1| b| x|
+---+---+---+
Ejemplo 5: UDTF registrado y llamado a través de SQL.
from pyspark.sql.functions import udtf, lit
from pyspark.sql.types import StructType
from pyspark.sql.udtf import AnalyzeArgument, AnalyzeResult
@udtf
class TestUDTFWithKwargs:
@staticmethod
def analyze(
a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
) -> AnalyzeResult:
return AnalyzeResult(
StructType().add("a", a.dataType)
.add("b", b.dataType)
.add("x", kwargs["x"].dataType)
)
def eval(self, a, b, **kwargs):
yield a, b, kwargs["x"]
_ = spark.udtf.register("test_udtf", TestUDTFWithKwargs)
spark.sql("SELECT * FROM test_udtf(1, x => 'x', b => 'b')").show()
+---+---+---+
| a| b| x|
+---+---+---+
| 1| b| x|
+---+---+---+
Ejemplo 6: UDTF con la optimización de flecha habilitada.
from pyspark.sql.functions import udtf, lit
@udtf(returnType="c1: int, c2: int", useArrow=True)
class ArrowPlusOne:
def eval(self, x: int):
yield x, x + 1
ArrowPlusOne(lit(1)).show()
+---+---+
| c1| c2|
+---+---+
| 1| 2|
+---+---+
Ejemplo 7: Crear un UDTF determinista.
from pyspark.sql.functions import udtf
class PlusOne:
def eval(self, a: int):
yield a + 1,
plus_one = udtf(PlusOne, returnType="r: int").asDeterministic()