다음을 통해 공유


Udf

UDF(사용자 정의 함수)를 만듭니다.

문법

import pyspark.sql.functions as sf

# As a decorator
@sf.udf
def function_name(col):
    # function body
    pass

# As a decorator with return type
@sf.udf(returnType=<returnType>, useArrow=<useArrow>)
def function_name(col):
    # function body
    pass

# As a function wrapper
sf.udf(f=<function>, returnType=<returnType>, useArrow=<useArrow>)

매개 변수

매개 변수 유형 Description
f function Optional. 독립 실행형 함수로 사용되는 경우 Python 함수입니다.
returnType pyspark.sql.types.DataType 또는 str Optional. 사용자 정의 함수의 반환 형식입니다. 값은 DataType 개체 또는 DDL 형식 형식 문자열일 수 있습니다. 기본값은 StringType입니다.
useArrow bool Optional. 화살표를 사용하여 (de) serialization을 최적화할지 여부입니다. None이면 Spark 구성 "spark.sql.execution.pythonUDF.arrow.enabled"가 적용됩니다.

예시

예제 1: 반환 형식의 람다, 데코레이터 및 데코레이터를 사용하여 UDF 만들기

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

slen = udf(lambda s: len(s), IntegerType())

@udf
def to_upper(s):
    if s is not None:
        return s.upper()

@udf(returnType=IntegerType())
def add_one(x):
    if x is not None:
        return x + 1

df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
|         8|      JOHN DOE|          22|
+----------+--------------+------------+

예제 2: 키워드 인수가 있는 UDF입니다.

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, col

@udf(returnType=IntegerType())
def calc(a, b):
    return a + 10 * b

spark.range(2).select(calc(b=col("id") * 10, a=col("id"))).show()
+-----------------------------+
|calc(b => (id * 10), a => id)|
+-----------------------------+
|                            0|
|                          101|
+-----------------------------+

예제 3: pandas Series 형식 힌트를 사용하여 UDF를 벡터화했습니다.

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, col, PandasUDFType
import pandas as pd

@udf(returnType=IntegerType())
def pd_calc(a: pd.Series, b: pd.Series) -> pd.Series:
    return a + 10 * b

pd_calc.evalType == PandasUDFType.SCALAR
spark.range(2).select(pd_calc(b=col("id") * 10, a="id")).show()
+--------------------------------+
|pd_calc(b => (id * 10), a => id)|
+--------------------------------+
|                               0|
|                             101|
+--------------------------------+

예제 4: PyArrow 배열 형식 힌트를 사용하여 벡터화된 UDF입니다.

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, col, ArrowUDFType
import pyarrow as pa

@udf(returnType=IntegerType())
def pa_calc(a: pa.Array, b: pa.Array) -> pa.Array:
    return pa.compute.add(a, pa.compute.multiply(b, 10))

pa_calc.evalType == ArrowUDFType.SCALAR
spark.range(2).select(pa_calc(b=col("id") * 10, a="id")).show()
+--------------------------------+
|pa_calc(b => (id * 10), a => id)|
+--------------------------------+
|                               0|
|                             101|
+--------------------------------+

예제 5: 화살표 최적화 Python UDF(Spark 4.2 이후 기본값)

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

# Arrow optimization is enabled by default since Spark 4.2
@udf(returnType=IntegerType())
def my_udf(x):
    return x + 1

# To explicitly disable Arrow optimization and use pickle-based serialization:
@udf(returnType=IntegerType(), useArrow=False)
def legacy_udf(x):
    return x + 1

예제 6: 비결정적 UDF 만들기

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
import random

random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()