Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Rejestruje funkcję Python (w tym funkcje lambda) lub funkcję zdefiniowaną przez użytkownika jako funkcję SQL.
Składnia
register(name, f, returnType=None)
Parametry
| Parameter | Typ | Opis |
|---|---|---|
name |
str | Nazwa funkcji zdefiniowanej przez użytkownika w instrukcjach SQL. |
f |
funkcja, udflub pandas_udf |
Funkcja Python lub funkcja zdefiniowana przez użytkownika. Funkcja zdefiniowana przez użytkownika może być wierszem w czasie lub wektoryzowana. |
returnType |
Typ danych lub str, opcjonalnie | Zwracany typ zarejestrowanej funkcji zdefiniowanej przez użytkownika. Może być obiektem DataType lub ciągiem typu w formacie DDL. Tylko wtedy, gdy f jest zwykłą funkcją Python, a nie wtedy, gdy f jest już funkcją zdefiniowaną przez użytkownika. |
Zwroty
funkcja
Notatki
Aby zarejestrować nieokreśloną funkcję Python, najpierw skompiluj nieokreśloną funkcję zdefiniowaną przez użytkownika dla funkcji Python, a następnie zarejestruj ją jako funkcję SQL.
Examples
# Register a lambda as a SQL function (return type defaults to string).
strlen = spark.udf.register("stringLengthString", lambda x: len(x))
spark.sql("SELECT stringLengthString('test')").collect()
# [Row(stringLengthString(test)='4')]
spark.sql("SELECT 'foo' AS text").select(strlen("text")).collect()
# [Row(stringLengthString(text)='3')]
# Register with an explicit return type.
from pyspark.sql.types import IntegerType
spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
spark.sql("SELECT stringLengthInt('test')").collect()
# [Row(stringLengthInt(test)=4)]
# Register an existing UDF.
from pyspark.sql.functions import udf
slen = udf(lambda s: len(s), IntegerType())
spark.udf.register("slen", slen)
spark.sql("SELECT slen('test')").collect()
# [Row(slen(test)=4)]
# Register a nondeterministic UDF.
import random
random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
spark.udf.register("random_udf", random_udf)
# Register a pandas UDF.
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
return s + 1
spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(id) FROM range(3)").collect()
# [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
# Register a grouped aggregate pandas UDF.
@pandas_udf("integer")
def sum_udf(v: pd.Series) -> int:
return v.sum()
spark.udf.register("sum_udf", sum_udf)
spark.sql(
"SELECT sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
).sort("sum_udf(v1)").collect()
# [Row(sum_udf(v1)=1), Row(sum_udf(v1)=5)]