Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Registrerar en Python funktion (inklusive lambda-funktioner) eller en användardefinierad funktion som en SQL-funktion.
Syntax
register(name, f, returnType=None)
Parameters
| Parameter | Type | Beskrivning |
|---|---|---|
name |
str | Namnet på den användardefinierade funktionen i SQL-instruktioner. |
f |
function, udf, eller pandas_udf |
En Python funktion eller en användardefinierad funktion. Den användardefinierade funktionen kan antingen vara rad i taget eller vektoriserad. |
returnType |
DataType eller str, valfritt | Returtypen för den registrerade användardefinierade funktionen. Kan vara ett DataType objekt eller en DDL-formaterad typsträng. Endast giltig när f är en vanlig Python funktion, inte när f redan är en användardefinierad funktion. |
Retur
function
Notes
Om du vill registrera en nondeterministisk Python-funktion skapar du först en icke-förutbestämd användardefinierad funktion för funktionen Python och registrerar den sedan som en SQL-funktion.
Exempel
# Register a lambda as a SQL function (return type defaults to string).
strlen = spark.udf.register("stringLengthString", lambda x: len(x))
spark.sql("SELECT stringLengthString('test')").collect()
# [Row(stringLengthString(test)='4')]
spark.sql("SELECT 'foo' AS text").select(strlen("text")).collect()
# [Row(stringLengthString(text)='3')]
# Register with an explicit return type.
from pyspark.sql.types import IntegerType
spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
spark.sql("SELECT stringLengthInt('test')").collect()
# [Row(stringLengthInt(test)=4)]
# Register an existing UDF.
from pyspark.sql.functions import udf
slen = udf(lambda s: len(s), IntegerType())
spark.udf.register("slen", slen)
spark.sql("SELECT slen('test')").collect()
# [Row(slen(test)=4)]
# Register a nondeterministic UDF.
import random
random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
spark.udf.register("random_udf", random_udf)
# Register a pandas UDF.
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
return s + 1
spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(id) FROM range(3)").collect()
# [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
# Register a grouped aggregate pandas UDF.
@pandas_udf("integer")
def sum_udf(v: pd.Series) -> int:
return v.sum()
spark.udf.register("sum_udf", sum_udf)
spark.sql(
"SELECT sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
).sort("sum_udf(v1)").collect()
# [Row(sum_udf(v1)=1), Row(sum_udf(v1)=5)]