Udf

Membuat fungsi yang ditentukan pengguna (UDF).

Syntax

import pyspark.sql.functions as sf

# As a decorator
@sf.udf
def function_name(col):
    # function body
    pass

# As a decorator with return type
@sf.udf(returnType=<returnType>, useArrow=<useArrow>)
def function_name(col):
    # function body
    pass

# As a function wrapper
sf.udf(f=<function>, returnType=<returnType>, useArrow=<useArrow>)

Parameter-parameternya

Pengaturan Tipe Description
f function Optional. Fungsi Python jika digunakan sebagai fungsi mandiri.
returnType pyspark.sql.types.DataType atau str Optional. Jenis pengembalian fungsi yang ditentukan pengguna. Nilai dapat berupa objek DataType atau string jenis berformat DDL. Default ke StringType.
useArrow bool Optional. Apakah akan menggunakan Panah untuk mengoptimalkan serialisasi (de).. Ketika tidak ada, konfigurasi Spark "spark.sql.execution.pythonUDF.arrow.enabled" berlaku.

Examples

Contoh 1: Membuat UDF menggunakan lambda, dekorator, dan dekorator dengan jenis pengembalian.

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

slen = udf(lambda s: len(s), IntegerType())

@udf
def to_upper(s):
    if s is not None:
        return s.upper()

@udf(returnType=IntegerType())
def add_one(x):
    if x is not None:
        return x + 1

df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
|         8|      JOHN DOE|          22|
+----------+--------------+------------+

Contoh 2: UDF dengan argumen kata kunci.

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, col

@udf(returnType=IntegerType())
def calc(a, b):
    return a + 10 * b

spark.range(2).select(calc(b=col("id") * 10, a=col("id"))).show()
+-----------------------------+
|calc(b => (id * 10), a => id)|
+-----------------------------+
|                            0|
|                          101|
+-----------------------------+

Contoh 3: UDF vektorisasi menggunakan petunjuk jenis Seri panda.

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, col, PandasUDFType
import pandas as pd

@udf(returnType=IntegerType())
def pd_calc(a: pd.Series, b: pd.Series) -> pd.Series:
    return a + 10 * b

pd_calc.evalType == PandasUDFType.SCALAR
spark.range(2).select(pd_calc(b=col("id") * 10, a="id")).show()
+--------------------------------+
|pd_calc(b => (id * 10), a => id)|
+--------------------------------+
|                               0|
|                             101|
+--------------------------------+

Contoh 4: UDF vektorisasi menggunakan petunjuk jenis Array PyArrow.

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, col, ArrowUDFType
import pyarrow as pa

@udf(returnType=IntegerType())
def pa_calc(a: pa.Array, b: pa.Array) -> pa.Array:
    return pa.compute.add(a, pa.compute.multiply(b, 10))

pa_calc.evalType == ArrowUDFType.SCALAR
spark.range(2).select(pa_calc(b=col("id") * 10, a="id")).show()
+--------------------------------+
|pa_calc(b => (id * 10), a => id)|
+--------------------------------+
|                               0|
|                             101|
+--------------------------------+

Contoh 5: Python UDF yang dioptimalkan panah (default sejak Spark 4.2).

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

# Arrow optimization is enabled by default since Spark 4.2
@udf(returnType=IntegerType())
def my_udf(x):
    return x + 1

# To explicitly disable Arrow optimization and use pickle-based serialization:
@udf(returnType=IntegerType(), useArrow=False)
def legacy_udf(x):
    return x + 1

Contoh 6: Membuat UDF non-deterministik.

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
import random

random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()