Anmerkung
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen, dich anzumelden oder die Verzeichnisse zu wechseln.
Der Zugriff auf diese Seite erfordert eine Genehmigung. Du kannst versuchen , die Verzeichnisse zu wechseln.
Erstellt eine benutzerdefinierte Funktion (UDF).
Syntax
import pyspark.sql.functions as sf
# As a decorator
@sf.udf
def function_name(col):
# function body
pass
# As a decorator with return type
@sf.udf(returnType=<returnType>, useArrow=<useArrow>)
def function_name(col):
# function body
pass
# As a function wrapper
sf.udf(f=<function>, returnType=<returnType>, useArrow=<useArrow>)
Die Parameter
| Parameter | Typ | Description |
|---|---|---|
f |
function |
Wahlfrei. Python-Funktion, wenn sie als eigenständige Funktion verwendet wird. |
returnType |
pyspark.sql.types.DataType oder str |
Wahlfrei. Der Rückgabetyp der benutzerdefinierten Funktion. Der Wert kann ein DataType-Objekt oder eine DDL-formatierte Typzeichenfolge sein. Standardwert ist StringType. |
useArrow |
bool |
Wahlfrei. Gibt an, ob Pfeil verwendet werden soll, um die Serialisierung (de)zu optimieren. Wenn "None" lautet, wird die Spark-Konfiguration "spark.sql.execution.pythonUDF.arrow.enabled" wirksam. |
Examples
Beispiel 1: Erstellen von UDFs mit Lambda-, Dekorier- und Dekorierer mit Rückgabetyp.
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
slen = udf(lambda s: len(s), IntegerType())
@udf
def to_upper(s):
if s is not None:
return s.upper()
@udf(returnType=IntegerType())
def add_one(x):
if x is not None:
return x + 1
df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
| 8| JOHN DOE| 22|
+----------+--------------+------------+
Beispiel 2: UDF mit Schlüsselwortargumenten.
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, col
@udf(returnType=IntegerType())
def calc(a, b):
return a + 10 * b
spark.range(2).select(calc(b=col("id") * 10, a=col("id"))).show()
+-----------------------------+
|calc(b => (id * 10), a => id)|
+-----------------------------+
| 0|
| 101|
+-----------------------------+
Beispiel 3: Vektorisierte UDF mit Pandas-Typhinweisen.
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, col, PandasUDFType
import pandas as pd
@udf(returnType=IntegerType())
def pd_calc(a: pd.Series, b: pd.Series) -> pd.Series:
return a + 10 * b
pd_calc.evalType == PandasUDFType.SCALAR
spark.range(2).select(pd_calc(b=col("id") * 10, a="id")).show()
+--------------------------------+
|pd_calc(b => (id * 10), a => id)|
+--------------------------------+
| 0|
| 101|
+--------------------------------+
Beispiel 4: Vektorisierte UDF mit PyArrow-Arraytyphinweisen.
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, col, ArrowUDFType
import pyarrow as pa
@udf(returnType=IntegerType())
def pa_calc(a: pa.Array, b: pa.Array) -> pa.Array:
return pa.compute.add(a, pa.compute.multiply(b, 10))
pa_calc.evalType == ArrowUDFType.SCALAR
spark.range(2).select(pa_calc(b=col("id") * 10, a="id")).show()
+--------------------------------+
|pa_calc(b => (id * 10), a => id)|
+--------------------------------+
| 0|
| 101|
+--------------------------------+
Beispiel 5: Pfeiloptimierte Python-UDF (Standard seit Spark 4.2).
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
# Arrow optimization is enabled by default since Spark 4.2
@udf(returnType=IntegerType())
def my_udf(x):
return x + 1
# To explicitly disable Arrow optimization and use pickle-based serialization:
@udf(returnType=IntegerType(), useArrow=False)
def legacy_udf(x):
return x + 1
Beispiel 6: Erstellen einer nicht deterministischen UDF.
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
import random
random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic()