Vad är användardefinierade tabellfunktioner i Python?
Viktigt!
Den här funktionen finns som allmänt tillgänglig förhandsversion.
Med en användardefinierad tabellfunktion (UDTF) kan du registrera funktioner som returnerar tabeller i stället för skalära värden. UDF:er fungerar på samma sätt som vanliga tabelluttryck (CTE) när de refereras i SQL-frågor. Du refererar till UDF:er i satsen i FROM
en SQL-instruktion och du kan länka ytterligare Spark SQL-operatorer till resultatet.
UDF:er är registrerade på den lokala SparkSession och är isolerade på notebook- eller jobbnivå.
UDF:er stöds för beräkning som konfigurerats med tilldelade eller icke-isoleringsbaserade delade åtkomstlägen. Du kan inte använda UDF:er i läget för delad åtkomst.
Du kan inte registrera UDF:er som objekt i Unity Catalog, och UDF:er kan inte användas med SQL-lager.
Vad är den grundläggande syntaxen för en UDTF?
Apache Spark implementerar Python UDTFs som Python-klasser med en obligatorisk eval
metod.
Du genererar resultat som rader med hjälp av yield
.
För att Apache Spark ska kunna använda din klass som UDTF måste du importera funktionen PySpark udtf
.
Databricks rekommenderar att du använder den här funktionen som dekoratör och alltid uttryckligen anger fältnamn och typer med hjälp av returnType
alternativet .
I följande exempel skapas en enkel tabell från skalära indata med hjälp av en UDTF:
from pyspark.sql.functions import lit, udtf
@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
def eval(self, x: int, y: int):
yield x + y, x - y
SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# | 3| -1|
# +----+-----+
Du kan använda Python-syntax *args
och implementera logik för att hantera ett ospecificerat antal indatavärden. I följande exempel returneras samma resultat när du uttryckligen kontrollerar indatalängden och typerna för argumenten:
@udtf(returnType="sum: int, diff: int")
class SimpleUDTF:
def eval(self, *args):
assert(len(args) == 2)
assert(isinstance(arg, int) for arg in args)
x = args[0]
y = args[1]
yield x + y, x - y
SimpleUDTF(lit(1), lit(2)).show()
# +----+-----+
# | sum| diff|
# +----+-----+
# | 3| -1|
# +----+-----+
Registrera en UDTF
Du kan registrera en UDTF till den aktuella SparkSession för användning i SQL-frågor med hjälp av följande syntax:
spark.udtf.register("<udtf-sql-name>", <udtf-python-name>)
I följande exempel registreras en Python UDTF till SQL:
spark.udtf.register("simple_udtf", SimpleUDTF)
När du har registrerat dig kan du använda UDTF i SQL med antingen det %sql
magiska kommandot eller spark.sql()
funktionen, som i följande exempel:
%sql
SELECT * FROM simple_udtf(1,2);
spark.sql("SELECT * FROM simple_udtf(1,2);")
Ger resultat
Python-UDF:er implementeras med yield
för att returnera resultat. Resultatet returneras alltid som en tabell som innehåller 0 eller fler rader med det angivna schemat.
När skalära argument skickas körs logiken eval
i metoden exakt en gång med uppsättningen skalära argument som skickas. För tabellargument eval
körs metoden en gång för varje rad i indatatabellen.
Logik kan skrivas för att returnera 0, 1 eller många rader per indata.
Följande UDTF visar hur du returnerar 0 eller fler rader för varje indata genom att separera objekt från en kommaavgränsad lista i separata poster:
from pyspark.sql.functions import udtf
@udtf(returnType="id: int, item: string")
class Itemize:
def eval(self, id: int, item_list: str):
items = item_list.split(",")
for item in items:
if item != "":
yield id, item
Skicka ett tabellargument till en UDTF
Du kan använda SQL-nyckelordet TABLE()
för att skicka ett tabellargument till en UDTF. Du kan använda ett tabellnamn eller en fråga, som i följande exempel:
TABLE(table_name);
TABLE(SELECT * FROM table_name);
Tabellargument bearbetas en rad i taget. Du kan använda standardkommentarer för PySpark-kolumnfält för att interagera med kolumner i varje rad. I följande exempel visas explicit import av PySpark-typen Row
och sedan filtrering av den skickade tabellen i id
fältet:
from pyspark.sql.functions import udtf
from pyspark.sql.types import Row
@udtf(returnType="id: int")
class FilterUDTF:
def eval(self, row: Row):
if row["id"] > 5:
yield row["id"],
spark.udtf.register("filter_udtf", FilterUDTF)
spark.sql("SELECT * FROM filter_udtf(TABLE(SELECT * FROM range(10)))").show()
# +---+
# | id|
# +---+
# | 6|
# | 7|
# | 8|
# | 9|
# +---+
Skicka skalära argument till en UDTF
Du kan skicka skalära argument till en UDTF med valfri kombination av följande värden:
- Skalära konstanter
- Skalärfunktioner
- Fält i en relation
Om du vill skicka fält i en relation måste du registrera UDTF och använda SQL-nyckelordet LATERAL
.
Kommentar
Du kan använda infogade tabellalias för att skilja kolumner åt.
I följande exempel visas hur du använder LATERAL
för att skicka fält från en tabell till en UDTF:
from pyspark.sql.functions import udtf
@udtf(returnType="id: int, item: string")
class Itemize:
def eval(self, id: int, item_list: str):
items = item_list.split(",")
for item in items:
if item != "":
yield id, item
spark.udtf.register("itemize", Itemize)
spark.sql("""
SELECT b.id, b.item FROM VALUES (1, 'pots,pans,forks'),
(2, 'spoons,'),
(3, ''),
(4, 'knives,cups') t(id, item_list),
LATERAL itemize(id, item_list) b
""").show()
Ange standardvärden för UDF:er
Du kan också implementera en __init__
metod för att ange standardvärden för klassvariabler som du kan referera till i Python-logiken.
Metoden __init__
accepterar inga argument och har ingen åtkomst till variabler eller tillståndsinformation i SparkSession.
Använda Apache Arrow med UDF:er
Databricks rekommenderar att du använder Apache Arrow för UDF:er som tar emot en liten mängd data som indata men matar ut en stor tabell.
Du kan aktivera Pil genom att ange parametern useArrow
när du deklarerar UDTF, som i följande exempel:
from pyspark.sql.functions import udtf
@udtf(returnType="c1: int, c2: int", useArrow=True)
class PlusOne:
def eval(self, x: int):
yield x, x + 1