Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Mendaftarkan fungsi Python (termasuk fungsi lambda) atau fungsi yang ditentukan pengguna sebagai fungsi SQL.
Sintaksis
register(name, f, returnType=None)
Parameter-parameternya
| Parameter | Tipe | Deskripsi |
|---|---|---|
name |
str | Nama fungsi yang ditentukan pengguna dalam pernyataan SQL. |
f |
fungsi, udf, atau pandas_udf |
Fungsi Python, atau fungsi yang ditentukan pengguna. Fungsi yang ditentukan pengguna dapat berupa row-at-a-time atau vektorisasi. |
returnType |
DataType atau str, opsional | Jenis pengembalian fungsi yang ditentukan pengguna terdaftar. Dapat berupa DataType objek atau string jenis berformat DDL. Hanya valid ketika f adalah fungsi Python biasa, bukan ketika f sudah menjadi fungsi yang ditentukan pengguna. |
Pengembalian Barang
fungsi
Catatan
Untuk mendaftarkan fungsi Python nondeterministik, pertama-tama buat fungsi yang ditentukan pengguna nondeterministik untuk fungsi Python lalu daftarkan sebagai fungsi SQL.
Examples
# Register a lambda as a SQL function (return type defaults to string).
strlen = spark.udf.register("stringLengthString", lambda x: len(x))
spark.sql("SELECT stringLengthString('test')").collect()
# [Row(stringLengthString(test)='4')]
spark.sql("SELECT 'foo' AS text").select(strlen("text")).collect()
# [Row(stringLengthString(text)='3')]
# Register with an explicit return type.
from pyspark.sql.types import IntegerType
spark.udf.register("stringLengthInt", lambda x: len(x), IntegerType())
spark.sql("SELECT stringLengthInt('test')").collect()
# [Row(stringLengthInt(test)=4)]
# Register an existing UDF.
from pyspark.sql.functions import udf
slen = udf(lambda s: len(s), IntegerType())
spark.udf.register("slen", slen)
spark.sql("SELECT slen('test')").collect()
# [Row(slen(test)=4)]
# Register a nondeterministic UDF.
import random
random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic()
spark.udf.register("random_udf", random_udf)
# Register a pandas UDF.
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
return s + 1
spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(id) FROM range(3)").collect()
# [Row(add_one(id)=1), Row(add_one(id)=2), Row(add_one(id)=3)]
# Register a grouped aggregate pandas UDF.
@pandas_udf("integer")
def sum_udf(v: pd.Series) -> int:
return v.sum()
spark.udf.register("sum_udf", sum_udf)
spark.sql(
"SELECT sum_udf(v1) FROM VALUES (3, 0), (2, 0), (1, 1) tbl(v1, v2) GROUP BY v2"
).sort("sum_udf(v1)").collect()
# [Row(sum_udf(v1)=1), Row(sum_udf(v1)=5)]